A sharded cluster is the most difficult type of deployment to administer. This chapter gives advice on performing administrative tasks on all parts of a cluster, including:
Inspecting what the cluster’s state is: who its members are, where data is held, and what connections are open
How to add, remove, and change members of a cluster
Administering data movement and manually moving data
There are several helpers available to find out what data is where, what the shards are, and what the cluster is doing.
sh.status()
gives you an
overview of your shards, databases, and sharded collections. If you have
a small number of chunks, it will print a breakdown of which chunks are
where as well. Otherwise it will simply give the collection’s shard key
and how many chunks each shard has:
>
sh
.
status
()
---
Sharding
Status
---
sharding
version
:
{
"_id"
:
1
,
"version"
:
3
}
shards
:
{
"_id"
:
"shard0000"
,
"host"
:
"localhost:30000"
,
"tags"
:
[
"USPS"
,
"Apple"
]
}
{
"_id"
:
"shard0001"
,
"host"
:
"localhost:30001"
}
{
"_id"
:
"shard0002"
,
"host"
:
"localhost:30002"
,
"tags"
:
[
"Apple"
]
}
databases
:
{
"_id"
:
"admin"
,
"partitioned"
:
false
,
"primary"
:
"config"
}
{
"_id"
:
"test"
,
"partitioned"
:
true
,
"primary"
:
"shard0001"
}
test
.
foo
shard
key
:
{
"x"
:
1
,
"y"
:
1
}
chunks
:
shard0000
4
shard0002
4
shard0001
4
{
"x"
:
{
$minKey
:
1
},
"y"
:
{
$minKey
:
1
}
}
-->>
{
"x"
:
0
,
"y"
:
10000
}
on
:
shard0000
{
"x"
:
0
,
"y"
:
10000
}
-->>
{
"x"
:
12208
,
"y"
:
-
2208
}
on
:
shard0002
{
"x"
:
12208
,
"y"
:
-
2208
}
-->>
{
"x"
:
24123
,
"y"
:
-
14123
}
on
:
shard0000
{
"x"
:
24123
,
"y"
:
-
14123
}
-->>
{
"x"
:
39467
,
"y"
:
-
29467
}
on
:
shard0002
{
"x"
:
39467
,
"y"
:
-
29467
}
-->>
{
"x"
:
51382
,
"y"
:
-
41382
}
on
:
shard0000
{
"x"
:
51382
,
"y"
:
-
41382
}
-->>
{
"x"
:
64897
,
"y"
:
-
54897
}
on
:
shard0002
{
"x"
:
64897
,
"y"
:
-
54897
}
-->>
{
"x"
:
76812
,
"y"
:
-
66812
}
on
:
shard0000
{
"x"
:
76812
,
"y"
:
-
66812
}
-->>
{
"x"
:
92793
,
"y"
:
-
82793
}
on
:
shard0002
{
"x"
:
92793
,
"y"
:
-
82793
}
-->>
{
"x"
:
119599
,
"y"
:
-
109599
}
on
:
shard0001
{
"x"
:
119599
,
"y"
:
-
109599
}
-->>
{
"x"
:
147099
,
"y"
:
-
137099
}
on
:
shard0001
{
"x"
:
147099
,
"y"
:
-
137099
}
-->>
{
"x"
:
173932
,
"y"
:
-
163932
}
on
:
shard0001
{
"x"
:
173932
,
"y"
:
-
163932
}
-->>
{
"x"
:
{
$maxKey
:
1
},
"y"
:
{
$maxKey
:
1
}
}
on
:
shard0001
test
.
ips
shard
key
:
{
"ip"
:
1
}
chunks
:
shard0000
2
shard0002
3
shard0001
3
{
"ip"
:
{
$minKey
:
1
}
}
-->>
{
"ip"
:
"002.075.101.096"
}
on
:
shard0000
{
"ip"
:
"002.075.101.096"
}
-->>
{
"ip"
:
"022.089.076.022"
}
on
:
shard0002
{
"ip"
:
"022.089.076.022"
}
-->>
{
"ip"
:
"038.041.058.074"
}
on
:
shard0002
{
"ip"
:
"038.041.058.074"
}
-->>
{
"ip"
:
"055.081.104.118"
}
on
:
shard0002
{
"ip"
:
"055.081.104.118"
}
-->>
{
"ip"
:
"072.034.009.012"
}
on
:
shard0000
{
"ip"
:
"072.034.009.012"
}
-->>
{
"ip"
:
"090.118.120.031"
}
on
:
shard0001
{
"ip"
:
"090.118.120.031"
}
-->>
{
"ip"
:
"127.126.116.125"
}
on
:
shard0001
{
"ip"
:
"127.126.116.125"
}
-->>
{
"ip"
:
{
$maxKey
:
1
}
}
on
:
shard0001
tag
:
Apple
{
"ip"
:
"017.000.000.000"
}
-->>
{
"ip"
:
"018.000.000.000"
}
tag
:
USPS
{
"ip"
:
"056.000.000.000"
}
-->>
{
"ip"
:
"057.000.000.000"
}
{
"_id"
:
"test2"
,
"partitioned"
:
false
,
"primary"
:
"shard0002"
}
Once there are more than a few chunks, sh.status()
will summarize the chunk stats
instead of pinting each chunk. To see all chunks, run sh.status(true)
(the true
tells sh.status()
to be verbose).
All the information sh.status()
shows is gathered from your
config database.
sh.status()
runs a MapReduce to
collect this data, so you cannot run sh.status()
when using the
--noscripting
option.
All of the configuration information about your cluster is kept in collections in the config database on the config servers. You can access it directly, but the shell has several helpers for exposing this information in a more readable way. However, you can always directly query the config database for metadata about your cluster.
Never connect directly to your config servers, as you do not want to take the chance of accidentally changing or removing config server data. Instead, connect to the mongos and use the config database to see its data, as you would for any other database:
mongos
>
use
config
If you manipulate config data through mongos (instead of connecting directly to the config servers), mongos will ensure that all of your config servers stay in sync and prevent various dangerous actions like accidentally dropping the config database.
In general, you should not directly change any data in the config database (exceptions are noted below). If you do change anything, you will generally have to restart all of your mongos servers to see its effect.
There are several collections in the config database. This section covers what each one contains and how it can be used.
The shards collection keeps track of all the shards in the cluster. A typical document in the shards collection might looks something like this:
>
db
.
shards
.
findOne
()
{
"_id"
:
"spock"
,
"host"
:
"spock/server-1:27017,server-2:27017,server-3:27017"
,
"tags"
:
[
"us-east"
,
"64gb mem"
,
"cpu3"
]
}
The shard’s "_id
" is picked
up from the replica set name, so each replica set in your cluster must
have a unique name.
When you update your replica set configuration (e.g., adding or
removing members), the "host
" field
will be updated automatically.
The databases collection keeps track of all of the databases, sharded and non, that the cluster knows about:
>
db
.
databases
.
find
()
{
"_id"
:
"admin"
,
"partitioned"
:
false
,
"primary"
:
"config"
}
{
"_id"
:
"test1"
,
"partitioned"
:
true
,
"primary"
:
"spock"
}
{
"_id"
:
"test2"
,
"partitioned"
:
false
,
"primary"
:
"bones"
}
If enableSharding has been
run on a database, "partitioned"
will be true. The "primary"
is the
database’s “home base.” By default, all new collections in that
database will be created on the database’s primary shard.
The collections collection keeps track of all sharded collections (non-sharded collections are not shown). A typical document looks something like this:
>
db
.
collections
.
findOne
()
{
"_id"
:
"test.foo"
,
"lastmod"
:
ISODate
(
"1970-01-16T17:53:52.934Z"
),
"dropped"
:
false
,
"key"
:
{
"x"
:
1
,
"y"
:
1
},
"unique"
:
true
}
The important fields are:
"_id"
The namespace of the collection.
"key"
The shard key. In this case, it is a compound key on
"x"
and "y"
.
"unique"
Indicates that the shard key is a unique index. This field
is not displayed unless it is true
(the shard key is unique). By
default, the shard key is not unique.
The chunks collection keeps a record of each chunk in all the collections. A typical document in the chunks collection might look something like this:
{
"_id"
:
"test.hashy-user_id_-1034308116544453153"
,
"lastmod"
:
{
"t"
:
5000
,
"i"
:
50
},
"lastmodEpoch"
:
ObjectId
(
"50f5c648866900ccb6ed7c88"
),
"ns"
:
"test.hashy"
,
"min"
:
{
"user_id"
:
NumberLong
(
"-1034308116544453153"
)
},
"max"
:
{
"user_id"
:
NumberLong
(
"-732765964052501510"
)
},
"shard"
:
"test-rs2"
}
The most useful fields are:
"_id"
The unique identifier for the chunk. Generally this is the namespace, shard key, and lower chunk boundary.
"ns"
The collection that this chunk is from.
"in"
The smallest value in the chunk’s range (inclusive).
"max"
All values in the chunk are smaller than this value.
"shard"
Which shard the chunk resides on.
The "lastmod"
and "lastmodEpoch"
fields are used to track
chunk versioning. For example, if a chunk "foo.bar-_id-1"
split into two chunks, we’d
want a way of distinguishing the new, smaller "foo.bar-_id-1"
chunk from its previous
incarnation. Thus, the "t"
and
"i"
fields are the
major and minor chunk
versions: major versions change when a chunk is migrated to a new
shard and minor versions change when a chunk is split.
sh.status()
uses the
config.chunks collection to
gather most of its information.
The changelog collection is useful for keeping track of what a cluster is doing, since it records all of the splits and migrates that have occurred.
Splits are recorded in a document that looks like this:
{
"_id"
:
"router1-2013-02-09T18:08:12-5116908cab10a03b0cd748c3"
,
"server"
:
"spock-01"
,
"clientAddr"
:
"10.3.1.71:62813"
,
"time"
:
ISODate
(
"2013-02-09T18:08:12.574Z"
),
"what"
:
"split"
,
"ns"
:
"test.foo"
,
"details"
:
{
"before"
:
{
"min"
:
{
"x"
:
{
$minKey
:
1
},
"y"
:
{
$minKey
:
1
}
},
"max"
:
{
"x"
:
{
$maxKey
:
1
},
"y"
:
{
$maxKey
:
1
}
},
"lastmod"
:
Timestamp
(
1000
,
0
),
"lastmodEpoch"
:
ObjectId
(
"000000000000000000000000"
)
},
"left"
:
{
"min"
:
{
"x"
:
{
$minKey
:
1
},
"y"
:
{
$minKey
:
1
}
},
"max"
:
{
"x"
:
0
,
"y"
:
10000
},
"lastmod"
:
Timestamp
(
1000
,
1
),
"lastmodEpoch"
:
ObjectId
(
"000000000000000000000000"
)
},
"right"
:
{
"min"
:
{
"x"
:
0
,
"y"
:
10000
},
"max"
:
{
"x"
:
{
$maxKey
:
1
},
"y"
:
{
$maxKey
:
1
}
},
"lastmod"
:
Timestamp
(
1000
,
2
),
"lastmodEpoch"
:
ObjectId
(
"000000000000000000000000"
)
}
}
}
The "details"
give
information about what the original document looked like and what it
split into.
This output is what the first chunk split of a collection looks
like. Note that each new chunk has its minor version increment:
"lastmod"
is Timestamp(1000, 1)
and Timestamp(1000, 2)
, respectively.
Migrates are a bit more complicated and actually create four
separate changelog documents: one noting the start of the migrate, one
for the "from"
shard, one for the
"to"
shard, and one for the
migrate’s commit (when the migration is finalized). The middle two
documents are of interest because these give a breakdown of how long
each step in the process took. This can give you an idea whether it’s
the disk, network, or something else that is causing a bottleneck on
migrates.
For example, the document created by the "from"
shard looks like this:
{
"_id"
:
"router1-2013-02-09T18:15:14-5116923271b903e42184211c"
,
"server"
:
"spock-01"
,
"clientAddr"
:
"10.3.1.71:27017"
,
"time"
:
ISODate
(
"2013-02-09T18:15:14.388Z"
),
"what"
:
"moveChunk.to"
,
"ns"
:
"test.foo"
,
"details"
:
{
"min"
:
{
"x"
:
24123
,
"y"
:
-
14123
},
"max"
:
{
"x"
:
39467
,
"y"
:
-
29467
},
"step1 of 5"
:
0
,
"step2 of 5"
:
0
,
"step3 of 5"
:
900
,
"step4 of 5"
:
0
,
"step5 of 5"
:
142
}
};
Each of the steps listed in "details"
is timed and the "stepN of 5"
messages show how long the step
took, in milliseconds. When the "from"
shard receives a moveChunk command from the mongos, it:
Checks the command parameters.
Confirms with the config servers that it can acquire a distributed lock for the migrate.
Tries to contact the "to"
shard.
The data is copied. This is referred to and logged as “the critical section.”
Coordinates with the "to"
shard and config servers to confirm the migrate.
Note that the "to"
and
"from"
shards must be in close
communication starting at "step4 of
5"
: the shards directly talk to one another and the config
server to perform the migration. If the "from"
server has flaky network connectivity
during the final steps, it may end up in a state where it cannot undo
the migrate and cannot move forward with it. In this case, the
mongod will shut down.
The "to"
shard’s changelog
document is similar to the "from"
shard’s, but the steps are a bit different. It looks like:
{
"_id"
:
"router1-2013-02-09T18:15:14-51169232ab10a03b0cd748e5"
,
"server"
:
"spock-01"
,
"clientAddr"
:
"10.3.1.71:62813"
,
"time"
:
ISODate
(
"2013-02-09T18:15:14.391Z"
),
"what"
:
"moveChunk.from"
,
"ns"
:
"test.foo"
,
"details"
:
{
"min"
:
{
"x"
:
24123
,
"y"
:
-
14123
},
"max"
:
{
"x"
:
39467
,
"y"
:
-
29467
},
"step1 of 6"
:
0
,
"step2 of 6"
:
2
,
"step3 of 6"
:
33
,
"step4 of 6"
:
1032
,
"step5 of 6"
:
12
,
"step6 of 6"
:
0
}
}
When the "to"
shard receives
a command from the "from"
shard,
it:
Migrates indexes. If this shard has never held chunks from the migrated collection before, it needs to know what fields are indexed. If this isn’t the first time a chunk from this collection is being moved to this shard, then this should be a no-op.
Deletes any existing data in the chunk range. There might be data left over from a failed migration or restore procedure which we wouldn’t want to interfere with the current data.
Copy all documents in the chunk to the "to"
shard.
Replay any operations that happened to these document during
the copy (on the "to"
shard).
Wait for the "to"
shard
to have replicated the newly migrated data to a majority of
servers.
Commit the migrate by changing the chunk’s metadata to say
that it lives on the "to"
shard.
This collection is created if you configure shard tags for your system. Each tag is associated with a chunk range:
>
db
.
tags
.
find
()
{
"_id"
:
{
"ns"
:
"test.ips"
,
"min"
:
{
"ip"
:
"056.000.000.000"
}
},
"ns"
:
"test.ips"
,
"min"
:
{
"ip"
:
"056.000.000.000"
},
"max"
:
{
"ip"
:
"057.000.000.000"
},
"tag"
:
"USPS"
}
{
"_id"
:
{
"ns"
:
"test.ips"
,
"min"
:
{
"ip"
:
"017.000.000.000"
}
},
"ns"
:
"test.ips"
,
"min"
:
{
"ip"
:
"017.000.000.000"
},
"max"
:
{
"ip"
:
"018.000.000.000"
},
"tag"
:
"Apple"
}
This collection contains documents representing the current balancer settings and chunk size. By changing the documents in this collection, you can turn the balancer on or off or change the chunk size. Note that you should always connect to mongos, not the config servers directly, to change values in this collection.
There are a lot of connections between the components of a cluster. This section covers some sharding-specific information. See Chapter 23 for more information on networking.
There is a command, connPoolStats, for finding out connection information about mongoses and mongods. This gives you information about how many connections a server has open, and to what:
>
db
.
adminCommand
({
"connPoolStats"
:
1
})
{
"createdByType"
:
{
"sync"
:
857
,
"set"
:
4
},
"numDBClientConnection"
:
35
,
"numAScopedConnection"
:
0
,
"hosts"
:
{
"config-01:10005,config-02:10005,config-03:10005"
:
{
"created"
:
857
,
"available"
:
2
},
"spock/spock-01:10005,spock-02:10005,spock-03:10005"
:
{
"created"
:
4
,
"available"
:
1
}
},
"totalAvailable"
:
3
,
"totalCreated"
:
861
,
"ok"
:
1
}
Hosts of the form
are config server connections, also known as “sync” connections. Hosts
that look like "host1
,host2
,host3"
are connections to shards. The "name
/host1
,host2
,...,hostN"
"available"
counts are how many connections
are currently available in the connection pools on this instance.
Note that this command only works on mongos processes and mongods that are members of a shard.
You may see connections to other shards in the output of connPoolStats from a shard, as shards connect to other shards to migrate data. The primary of one shard will connect directly to the primary of another shard and “suck” its data.
When a migrate occurs, a shard sets up a
ReplicaSetMonitor
(a process that monitors
replica set health) to track the health of the shard on the other side
of the migrate. mongod never
destroys this monitor, so you may see messages in one replica set’s log
about the members of another replica set. This is totally normal and
should have no effect on your application.
When a client connects to a mongos, mongos creates a connection to at least one shard to pass along the client’s request. Thus, every client connection into a mongos yields at least one outgoing connection from mongos to the shards.
If you have many mongos processes, they may create more connections than your shards can handle: a mongos allows up to 20,000 connections (same as mongod), so if you have 5 mongos processes with 10,000 client connections each, they may be attempting to create 50,000 connections to a shard!
To prevent this, you can use the maxConns
option
to your command line configuration for mongos to limit the number of connections it
can create. The following formula can be used to calculate the maximum
number of connections a shard can handle from a single mongos:
Breaking down the pieces of this formula:
Each mongos creates three connections per mongod: a connection to forward client requests, an error-tracking connection (the writeback listener), and a connection to monitor the replica set’s status.
The primary creates a connection to each secondary and each secondary creates two connections to the primary, for a total of three connections.
other
is the number of miscellaneous
processes that may connect to your mongods, such as MMS agents, direct
shell connections (for administration), or connections to other
shards for migrations.
Note that maxConns
only prevents mongos from creating more than this many
connections. It doesn’t mean that it does anything particularly helpful
when it runs out of connections: it will block requests, waiting for
connections to be “freed.” Thus, you must prevent your application from
using this many connections, especially as your number of mongos processes grows.
When a MongoDB instance exits cleanly it closes all connections before stopping. The members who were connected to it will immediately get socket errors on those connections and be able to refresh them. However, if a MongoDB instance suddenly goes offline due to a power loss, crash, or network problems, it probably won’t cleanly close all of its sockets. In this case, other servers in the cluster may be under the impression that their connection is healthy until they try to perform an operation on it. At that point, they will get an error and refresh the connection (if the member is up again at that point).
This is a quick process when there are only a few connections. However, when there are thousands of connections that must be refreshed one by one you can get a lot of errors because each connection to the downed member must be tried, determined to be bad, and re-established. There isn’t a particularly good way of preventing this aside from restarting processes that get bogged down in a reconnection storm.
As your cluster grows, you’ll need to add capacity or change configurations. This section covers how to add and remove servers from your cluster.
You can add new mongos
processes at any time. Make sure their --configdb
option specifies the correct set of config servers and they should be
immediately available for clients to connect to.
To add new shards, use the "addShard
" command as show in Chapter 14.
As you use your sharded cluster, you may want to change the servers in individual shards. To change a shard’s membership, connect directly to the shard’s primary (not through the mongos) and issue a replica set reconfig. The cluster configuration will pick up the change and update config.shards automatically. Do not modify config.shards by hand.
The only exception to this is if you started your cluster with standalone servers as shards, not replica sets.
The easiest way to do this is to add a new, empty replica set shard and then remove the standalone server shard (see Removing a Shard).
If you wish to turn the standalone server into a replica set the process is fairly complex and involves downtime:
Stop requests to the system.
Shut down the standalone server (call it server-1) and all mongos processes.
Restart the server-1 in
replica set mode (with the --replSet
option).
Connect to server-1 and initiate the set as a one-member replica set.
Connect to each config server and replace this shard’s entry
in config.shards to have a
form for the shard name. Make sure all three config servers have
identical information. It is risky to manually edit config
servers!setName
/server-1:27017
A good way of ensuring that they are identical is to run the dbhash command on each config server:
>
db
.
runCommand
({
"dbhash"
:
1
})
This comes up with an MD5 sum for each collection. Some collections in the config database will be different on different config servers, but config.shards should not be.
Restart all mongos processes. They will read the shard data off of the config servers at start up and treat the replica set as a shard.
Restart all shards’ primaries to refresh their config data.
Send requests to the system again.
Add other members to server-1’s set.
This process is complex, error prone, and not recommended. If at all possible, just add a new shard that’s an empty replica set and let migrations take care of moving your data to it.
In general, shards should not be removed from a cluster. If you are regularly adding and removing shards, you are putting a lot more stress on the system than necessary. If you add too many shards it is better to let your system grow into it, not remove them and add them back later. However, if necessary, you can remove shards.
First make sure that the balancer is on. The balancer will be
tasked with moving all the data on this shard to other shards in a
process called draining. To start draining, run
the removeShard
command. removeShard
takes the shard’s name and drains
all the chunks on a given shard to the other shards:
>
db
.
adminCommand
({
"removeShard"
:
"test-rs3"
})
{
"msg"
:
"draining started successfully"
,
"state"
:
"started"
,
"shard"
:
"test-rs3"
,
"note"
:
"you need to drop or movePrimary these databases"
,
"dbsToMove"
:
[
"blog"
,
"music"
,
"prod"
],
"ok"
:
1
}
Draining can take a long time if there are a lot of chunks or large chunks to move. If you have jumbo chunks (see Jumbo Chunks), you may have to temporarily raise the chunk size to allow draining to move them.
If you want to keep tabs on how much has been moved, run removeShard again to give you the current status:
>
db
.
adminCommand
({
"removeShard"
:
"test-rs3"
})
{
"msg"
:
"draining ongoing"
,
"state"
:
"ongoing"
,
"remaining"
:
{
"chunks"
:
NumberLong
(
5
),
"dbs"
:
NumberLong
(
0
)
},
"ok"
:
1
}
You can run removeShard as many times as you want.
Chunks may have to split to be moved, so you may see the number of chunks increase in the system during the drain. For example, suppose we have a 5-shard cluster with the following chunk distributions:
test
-
rs0
10
test
-
rs1
10
test
-
rs2
10
test
-
rs3
11
test
-
rs4
11
This cluster has a total of 52 chunks. If we remove test-rs3, we might end up with:
test
-
rs0
15
test
-
rs1
15
test
-
rs2
15
test
-
rs4
15
The cluster now has 60 chunks, 18 of which came from shard test-rs3 (11 were there to start and 7 were created from draining splits).
Once all the chunks have been moved, if there are still databases “homed” on the shard, you’ll need to remove them before the shard can be removed. The output of removeShard will be something like:
>
db
.
adminCommand
({
"removeShard"
:
"test-rs3"
})
{
"msg"
:
"draining ongoing"
,
"state"
:
"ongoing"
,
"remaining"
:
{
"chunks"
:
NumberLong
(
0
),
"dbs"
:
NumberLong
(
3
)
},
"note"
:
"you need to drop or movePrimary these databases"
,
"dbsToMove"
:
[
"blog"
,
"music"
,
"prod"
],
"ok"
:
1
}
To finish the remove, move the homed databases with the movePrimary command:
>
db
.
adminCommand
({
"movePrimary"
:
"blog"
,
"to"
:
"test-rs4"
})
{
"primary "
:
"test-rs4:test-rs4/ubuntu:31500,ubuntu:31501,ubuntu:31502"
,
"ok"
:
1
}
Once you have moved any databases, run removeShard one more time:
>
db
.
adminCommand
({
"removeShard"
:
"test-rs3"
})
{
"msg"
:
"removeshard completed successfully"
,
"state"
:
"completed"
,
"shard"
:
"test-rs3"
,
"ok"
:
1
}
This is not strictly necessary, but it confirms that you have completed the process. If there are no databases homed on this shard, you will get this response as soon as all chunks have been migrated off.
Once you have started a shard draining, there is no built-in way to stop it.
Changing anything about your config servers is difficult, dangerous, and generally involves downtime. Before doing any maintenance on config servers, take a backup.
All mongos processes need to
have the same value for --configdb
whenever they are
running. Thus, to change the config servers, you must shut down all your
mongos processes, make sure they
are all down (no mongos process can
still be running with the old --configdb
argument), and
then restart them with the new --configdb
argument.
For example, one of the most common tasks is to move from one
config server to three. To accomplish this, shut down your mongos processes, your config server, and all
your shards. Copy the data directory of your config servers to the two
new config servers (so that there is an identical data directory on all
three servers). Now, start up all three config servers and the shards.
Then start each of the mongos
processes with --configdb
pointing to all three config
servers.
In general, MongoDB automatically takes care of balancing data. This section covers how to enable and disable this automatic balancing as well as how to intervene in the balancing process.
Turning off the balancer is a prerequisite to nearly any administrative activity. There is a shell helper to make this easier:
>
sh
.
setBalancerState
(
false
)
With the balancer off a new balancing round will not begin, but it will not force an ongoing balancing round to stop immediately: migrations generally cannot stop on a dime. Thus, you should check the config.locks collection to see whether or not a balancing round is still in progress:
>
db
.
locks
.
find
({
"_id"
:
"balancer"
})[
"state"
]
0
0
means the balancer is off. See The Balancer for an explanation of the balancer
states.
Balancing puts load on your system: the destination shard must query the source shard for all the documents in a chunk, insert them, and then the source shard must delete them. There are two circumstances in particular where migrations can cause performance problems:
Using a hotspot shard key will force constant migrations (as all new chunks will be created on the hotspot). Your system must have the capacity to handle the flow of data coming off of your hotspot shard.
Adding a new shard will trigger a stream of migrations as the balancer attempts to populate it.
If you find that migrations are affecting your application’s performance, you can schedule a window for balancing in the config.settings collection. Run the following update to only allow balancing between 1 p.m. and 4 p.m.:
>
db
.
settings
.
update
({
"_id"
:
"balancer"
},
...
{
"$set"
:
{
"activeWindow"
:
{
"start"
:
"13:00"
,
"stop"
:
"16:00"
}}},
...
true
)
If you set a balancing window, monitor it closely to ensure that mongos can actually keep your cluster balanced in the time that you have allotted it.
You must be careful if you plan to combine manual balancing with
the automatic balancer, since the automatic balancer always determines
what to move based on the current state of the set and does not take
into account the set’s history. For example, suppose you have
shardA
and shardB
,
each holding 500 chunks. shardA
is getting a
lot of writes, so you turn off the balancer and move 30 of the most
active chunks to shardB
. If you turn the
balancer back on at this point, it will immediately swoop in and move 30
chunks (possibly a different 30) back from
shardB
to shardA
to balance the chunk counts.
To prevent this, move 30 quiescent chunks from
shardB
to shardA
before starting the balancer. That way there will be no imbalance
between the shards and the balancer will be happy to leave things as
they are. Alternatively, you could perform 30 splits on
shardA
’s chunks to even out the chunk
counts.
Note that the balancer only uses number of chunks as a metric, not size of data. Thus, a shard with a few large chunks may end up as the target of a migration from a shard with many small chunks (but a smaller data size).
There can be anywhere from zero to millions of documents in a chunk. Generally, the larger a chunk is, the longer it takes to migrate to another shard. In Chapter 13, we used a chunk size of 1 MB, so that we could see chunk movement easily and quickly. This is generally impractical in a live system. MongoDB would be doing a lot of unnecessary work to keep shards within a few megabytes of each other in size. By default, chunks are 64 MB, which is generally a good balance between ease of migration and migratory churn.
Sometimes you may find that migrations are taking too long with 64 MB chunks. To speed them up, you can decrease your chunk size. To do this, connect to mongos through the shell and update the config.settings collection:
>
db
.
settings
.
findOne
()
{
"_id"
:
"chunksize"
,
"value"
:
64
}
>
db
.
settings
.
save
({
"_id"
:
"chunksize"
,
"value"
:
32
})
The previous update would change your chunk size to 32 MB. Existing chunks would not be changed immediately, but as splits occurred chunks would trend toward that size. mongos processes will automatically load the new chunk size value.
Note that this is a cluster-wide setting: it affects all collections and databases. Thus, if you need a small chunk size for one collection and a large chunk size for another, you may have to compromise with a chunk size in between the two ideals (or put the collections in different clusters).
If MongoDB is doing too many migrations or your documents are large, you may want to increase your chunk size.
As mentioned earlier, all the data in a chunk lives on a certain shard. If that shard ends up with more chunks than the other shards, MongoDB will move some chunks off it. Moving a chunk is called a migration and is how MongoDB balances data across your cluster.
You can manually move chunks using the moveChunk shell helper:
>
sh
.
moveChunk
(
"test.users"
,
{
"user_id"
:
NumberLong
(
"1844674407370955160"
)},
...
"spock"
)
{
"millis"
:
4079
,
"ok"
:
1
}
This would move the chunk containing the document with "user_id"
of 1844674407370955160 to the shard
named "spock"
. You must use the shard
key to find which chunk to move ("user_id"
, in this case). Generally, the
easiest way to specify a chunk is by its lower bound, although any value
in the chunk will work (the upper bound will not, as it is not actually
in the chunk). This command will move the chunk before returning, so it
may take a while to run. The logs are the best place to see what it is
doing if it takes a long time.
If a chunk is larger than the max chunk size, mongos will refuse to move it:
>
sh
.
moveChunk
(
"test.users"
,
{
"user_id"
:
NumberLong
(
"1844674407370955160"
)},
...
"spock"
)
{
"cause"
:
{
"chunkTooBig"
:
true
,
"estimatedChunkSize"
:
2214960
,
"ok"
:
0
,
"errmsg"
:
"chunk too big to move"
},
"ok"
:
0
,
"errmsg"
:
"move failed"
}
In this case, you must manually split the chunk before moving it, using the splitAt command:
>
db
.
chunks
.
find
({
"ns"
:
"test.users"
,
...
"min.user_id"
:
NumberLong
(
"1844674407370955160"
)})
{
"_id"
:
"test.users-user_id_NumberLong("1844674407370955160")"
,
"ns"
:
"test.users"
,
"min"
:
{
"user_id"
:
NumberLong
(
"1844674407370955160"
)
},
"max"
:
{
"user_id"
:
NumberLong
(
"2103288923412120952"
)
},
"shard"
:
"test-rs2"
}
>
sh
.
splitAt
(
"test.ips"
,
{
"user_id"
:
NumberLong
(
"2000000000000000000"
)})
{
"ok"
:
1
}
>
db
.
chunks
.
find
({
"ns"
:
"test.users"
,
...
"min.user_id"
:
{
"$gt"
:
NumberLong
(
"1844674407370955160"
)},
...
"max.user_id"
:
{
"$lt"
:
NumberLong
(
"2103288923412120952"
)}})
{
"_id"
:
"test.users-user_id_NumberLong("1844674407370955160")"
,
"ns"
:
"test.users"
,
"min"
:
{
"user_id"
:
NumberLong
(
"1844674407370955160"
)
},
"max"
:
{
"user_id"
:
NumberLong
(
"2000000000000000000"
)
},
"shard"
:
"test-rs2"
}
{
"_id"
:
"test.users-user_id_NumberLong("2000000000000000000")"
,
"ns"
:
"test.users"
,
"min"
:
{
"user_id"
:
NumberLong
(
"2000000000000000000"
)
},
"max"
:
{
"user_id"
:
NumberLong
(
"2103288923412120952"
)
},
"shard"
:
"test-rs2"
}
Once the chunk has been split into smaller pieces, it should be movable. Alternatively, you can raise the max chunk size and then move it, but you should break up large chunks whenever possible. Sometimes, though, they cannot be broken up: these are called jumbo chunks.
Suppose you choose the "date"
field as your shard key. The "date"
field in this collection is a string that looks like "
,
which means that mongos can create
at most one chunk per day. This works fine for a while, until your
application suddenly goes viral and gets a thousand times its typical
traffic for one day.year
/month
/day
"
This day’s chunk is going to be much larger than any other day’s, but it is also completely unsplittable because every document has the same value for the shard key.
Once a chunk is larger than the max chunk size set in config.settings, the balancer will not be allowed to move the chunk. These unsplittable, unmovable chunks are called jumbo chunks and they are inconvenient to deal with.
Let’s take an example. Suppose there are three shards, shard1, shard2, and shard3. If you use the hotspot shard key pattern described in Ascending Shard Keys, all your writes will be going to one shard, say shard1. mongos will try to balance the number of chunks evenly between the shards. But the only chunks that the balancer can move are the non-jumbo chunks, so it will migrate all the small chunks off the hotspot shard.
Now all the shards will have roughly the same number of chunks, but all of shard2 and shard3’s chunks will be less than 64 MB in size. And if jumbo chunks are being created, more and more of shard1’s chunks will be more than 64 MB in size. Thus, shard1 will fill up a lot faster than the other two shards, even though the number of chunks is perfectly balanced between the three.
Thus, one of the indicators that you have jumbo chunk problems is
that one shard’s size is growing much faster than the others. You can
also look at sh.status()
to see if
you have jumbo chunks: they will be marked with a "jumbo"
attribute:
>
sh
.
status
()
...
{
"x"
:
-
7
}
-->>
{
"x"
:
5
}
on
:
shard0001
{
"x"
:
5
}
-->>
{
"x"
:
6
}
on
:
shard0001
jumbo
{
"x"
:
6
}
-->>
{
"x"
:
7
}
on
:
shard0001
jumbo
{
"x"
:
7
}
-->>
{
"x"
:
339
}
on
:
shard0001
...
You can use the dataSize
command to check chunk sizes.
First, we use the config.chunks collection to find the chunk ranges:
>
use
config
>
var
chunks
=
db
.
chunks
.
find
({
"ns"
:
"acme.analytics"
}).
toArray
()
Use these chunk ranges to find possible jumbo chunks:
>
use
dbName
>
db
.
runCommand
({
"dataSize"
:
"
dbName
.
collName
",
... "
keyPattern
" : {"
date
" : 1}, // shard key
... "
min
" : chunks[0].min,
... "
max
" : chunks[0].max})
{ "
size
" : 11270888, "
numObjects
" : 128081, "
millis
" : 100, "
ok
"
:
1
}
Be careful—the dataSize command
does have to scan the chunk’s data to figure out how big it is. If you
can, narrow down your search by using your knowledge of your data: were
jumbo chunks created on certain date? For example, if there was a really
busy day on November 1, look for chunks with that day in their shard key
range. If you’re using GridFS and sharding by "files_id"
, you can look at the fs.files collection to find a file’s
size.
To fix a cluster thrown off-balance by jumbo chunks, you must evenly distribute them among the shards.
This is a complex manual process, but should not cause any downtime (it may cause slowness, as you’ll be migrating a lot of data). In the description below, the shard with the jumbo chunks is referred to as the “from” shard. The shards that the jumbo chunks are migrated to are called the “to” shards. Note that you may have multiple “from” shards that you wish to move chunks off of. Repeat these steps for each:
Turn off the balancer. You don’t want to the balancer trying to “help” during this process:
>
sh
.
setBalancerState
(
false
)
MongoDB will not allow you to move chunks larger than the max chunk size, so temporarily raise the chunk size. Make a note of what your original chunk size is and then change it to something large, like 10,000. Chunk size is specified in megabytes:
>
use
config
>
db
.
settings
.
findOne
({
"_id"
:
"chunksize"
})
{
"_id"
:
"chunksize"
,
"value"
:
64
}
>
db
.
settings
.
save
({
"_id"
:
"chunksize"
,
"value"
:
10000
})
Use the moveChunk
command
to move jumbo chunks off the “from” shard. If you are concerned
about the impact migrations will have on your application’s
performance, use the secondaryThrottle
option to
prevent them from happening too quickly:
>
db
.
adminCommand
({
"moveChunk"
:
"acme.analytics"
,
...
"find"
:
{
"date"
:
new
Date
(
"10/23/2012"
)},
...
"to"
:
"shard0002"
,
...
"secondaryThrottle"
:
true
})
secondaryThrottle
forces migrations to
periodically wait until a majority of secondaries have replicated
the migration. It only works if you are running with shards that
are replica sets (not standalone servers).
Run splitChunk on the remaining chunks on the donor shard until it has a roughly even number of chunks as the other shards.
Set chunk size back to its original value:
>
db
.
settings
.
save
({
"_id"
:
"chunksize"
,
"value"
:
64
})
Turn on the balancer:
>
sh
.
setBalancerState
(
true
)
When the balancer is turned on again it cannot move the jumbo chunks again, as they are essentially held in place by their size.
As the amount of data you are storing grows, the manual process described in the previous section becomes unsustainable. Thus, if you’re having problems with jumbo chunks, you should make it a priority to prevent them from forming.
To prevent jumbo chunks, modify your shard key to have more
granularity. You want almost every document to have a unique value for
the shard key, or at least never have more than
chunksize
-worth of data with a single shard
key value.
For example, if you were using the year/month/day key described earlier it can quickly be made finer-grained by adding hours, minutes, and seconds. Similarly, if you’re sharding on something coarsely-grained key like log level, add a second field to your shard key with a lot of granularity, such as an MD5 hash or UUID. Then you can always split a chunk, even if the first field is the same for many documents.
As a final tip, sometimes mongos will not update its configuration correctly from the config servers. If you ever get a configuration that you don’t expect or a mongos seems to be out of date or cannot find data that you know is there, use the flushRouterConfig command to manually clear all caches:
>
db
.
adminCommand
({
"flushRouterConfig"
:
1
})
If flushRouterConfig does not work, restarting all your mongos or mongod processes clears any possible cache.
3.144.33.41