Chapter 16. Sharding Administration

A sharded cluster is the most difficult type of deployment to administer. This chapter gives advice on performing administrative tasks on all parts of a cluster, including:

  • Inspecting what the cluster’s state is: who its members are, where data is held, and what connections are open

  • How to add, remove, and change members of a cluster

  • Administering data movement and manually moving data

Seeing the Current State

There are several helpers available to find out what data is where, what the shards are, and what the cluster is doing.

Getting a Summary with sh.status

sh.status() gives you an overview of your shards, databases, and sharded collections. If you have a small number of chunks, it will print a breakdown of which chunks are where as well. Otherwise it will simply give the collection’s shard key and how many chunks each shard has:

> sh.status()
--- Sharding Status --- 
  sharding version: { "_id" : 1, "version" : 3 }
  shards:
    { "_id" : "shard0000", "host" : "localhost:30000", 
      "tags" : [ "USPS" , "Apple" ] }
    {  "_id" : "shard0001",  "host" : "localhost:30001" }
    {  "_id" : "shard0002",  "host" : "localhost:30002",  "tags" : [ "Apple" ] }
  databases:
    {  "_id" : "admin",  "partitioned" : false,  "primary" : "config" }
    {  "_id" : "test",  "partitioned" : true,  "primary" : "shard0001" }
      test.foo
        shard key: { "x" : 1, "y" : 1 }
        chunks:
          shard0000    4
          shard0002    4
          shard0001    4
        { "x" : { $minKey : 1 }, "y" : { $minKey : 1 } } -->> 
            { "x" : 0, "y" : 10000 } on : shard0000
        { "x" : 0, "y" : 10000 } -->> { "x" : 12208, "y" : -2208 } 
            on : shard0002
        { "x" : 12208, "y" : -2208 } -->> { "x" : 24123, "y" : -14123 } 
            on : shard0000 
        { "x" : 24123, "y" : -14123 } -->> { "x" : 39467, "y" : -29467 } 
            on : shard0002
        { "x" : 39467, "y" : -29467 } -->> { "x" : 51382, "y" : -41382 } 
            on : shard0000
        { "x" : 51382, "y" : -41382 } -->> { "x" : 64897, "y" : -54897 } 
            on : shard0002
        { "x" : 64897, "y" : -54897 } -->> { "x" : 76812, "y" : -66812 } 
            on : shard0000
        { "x" : 76812, "y" : -66812 } -->> { "x" : 92793, "y" : -82793 } 
            on : shard0002
        { "x" : 92793, "y" : -82793 } -->> { "x" : 119599, "y" : -109599 } 
            on : shard0001
        { "x" : 119599, "y" : -109599 } -->> { "x" : 147099, "y" : -137099 } 
            on : shard0001
        { "x" : 147099, "y" : -137099 } -->> { "x" : 173932, "y" : -163932 } 
            on : shard0001
        { "x" : 173932, "y" : -163932 } -->> 
            { "x" : { $maxKey : 1 }, "y" : { $maxKey : 1 } } on : shard0001
      test.ips
        shard key: { "ip" : 1 }
        chunks:
          shard0000    2
          shard0002    3
          shard0001    3
        { "ip" : { $minKey : 1 } } -->> { "ip" : "002.075.101.096" } 
          on : shard0000
        { "ip" : "002.075.101.096" } -->> { "ip" : "022.089.076.022" } 
          on : shard0002 
        { "ip" : "022.089.076.022" } -->> { "ip" : "038.041.058.074" }
          on : shard0002 
        { "ip" : "038.041.058.074" } -->> { "ip" : "055.081.104.118" }
          on : shard0002 
        { "ip" : "055.081.104.118" } -->> { "ip" : "072.034.009.012" }
          on : shard0000 
        { "ip" : "072.034.009.012" } -->> { "ip" : "090.118.120.031" }
          on : shard0001 
        { "ip" : "090.118.120.031" } -->> { "ip" : "127.126.116.125" }
          on : shard0001
        { "ip" : "127.126.116.125" } -->> { "ip" : { $maxKey : 1 } }
          on : shard0001
          tag: Apple { "ip" : "017.000.000.000" } -->> { "ip" : "018.000.000.000" }
          tag: USPS { "ip" : "056.000.000.000" } -->> { "ip" : "057.000.000.000" }
    {  "_id" : "test2",  "partitioned" : false,  "primary" : "shard0002" }

Once there are more than a few chunks, sh.status() will summarize the chunk stats instead of pinting each chunk. To see all chunks, run sh.status(true) (the true tells sh.status() to be verbose).

All the information sh.status() shows is gathered from your config database.

sh.status() runs a MapReduce to collect this data, so you cannot run sh.status() when using the --noscripting option.

Seeing Configuration Information

All of the configuration information about your cluster is kept in collections in the config database on the config servers. You can access it directly, but the shell has several helpers for exposing this information in a more readable way. However, you can always directly query the config database for metadata about your cluster.

Warning

Never connect directly to your config servers, as you do not want to take the chance of accidentally changing or removing config server data. Instead, connect to the mongos and use the config database to see its data, as you would for any other database:

mongos> use config

If you manipulate config data through mongos (instead of connecting directly to the config servers), mongos will ensure that all of your config servers stay in sync and prevent various dangerous actions like accidentally dropping the config database.

In general, you should not directly change any data in the config database (exceptions are noted below). If you do change anything, you will generally have to restart all of your mongos servers to see its effect.

There are several collections in the config database. This section covers what each one contains and how it can be used.

config.shards

The shards collection keeps track of all the shards in the cluster. A typical document in the shards collection might looks something like this:

> db.shards.findOne()
{
    "_id" : "spock", 
    "host" : "spock/server-1:27017,server-2:27017,server-3:27017",
    "tags" : [
        "us-east",
        "64gb mem",
        "cpu3"
    ]
}

The shard’s "_id" is picked up from the replica set name, so each replica set in your cluster must have a unique name.

When you update your replica set configuration (e.g., adding or removing members), the "host" field will be updated automatically.

config.databases

The databases collection keeps track of all of the databases, sharded and non, that the cluster knows about:

> db.databases.find()
{ "_id" : "admin", "partitioned" : false, "primary" : "config" }
{ "_id" : "test1", "partitioned" : true, "primary" : "spock" }
{ "_id" : "test2", "partitioned" : false, "primary" : "bones" }

If enableSharding has been run on a database, "partitioned" will be true. The "primary" is the database’s “home base.” By default, all new collections in that database will be created on the database’s primary shard.

config.collections

The collections collection keeps track of all sharded collections (non-sharded collections are not shown). A typical document looks something like this:

> db.collections.findOne()
{ 
    "_id" : "test.foo", 
    "lastmod" : ISODate("1970-01-16T17:53:52.934Z"), 
    "dropped" : false, 
    "key" : { "x" : 1, "y" : 1 }, 
    "unique" : true 
}

The important fields are:

"_id"

The namespace of the collection.

"key"

The shard key. In this case, it is a compound key on "x" and "y".

"unique"

Indicates that the shard key is a unique index. This field is not displayed unless it is true (the shard key is unique). By default, the shard key is not unique.

config.chunks

The chunks collection keeps a record of each chunk in all the collections. A typical document in the chunks collection might look something like this:

{ 
    "_id" : "test.hashy-user_id_-1034308116544453153", 
    "lastmod" : { "t" : 5000, "i" : 50 }, 
    "lastmodEpoch" : ObjectId("50f5c648866900ccb6ed7c88"), 
    "ns" : "test.hashy", 
    "min" : { "user_id" : NumberLong("-1034308116544453153") }, 
    "max" : { "user_id" : NumberLong("-732765964052501510") }, 
    "shard" : "test-rs2" 
}

The most useful fields are:

"_id"

The unique identifier for the chunk. Generally this is the namespace, shard key, and lower chunk boundary.

"ns"

The collection that this chunk is from.

"in"

The smallest value in the chunk’s range (inclusive).

"max"

All values in the chunk are smaller than this value.

"shard"

Which shard the chunk resides on.

The "lastmod" and "lastmodEpoch" fields are used to track chunk versioning. For example, if a chunk "foo.bar-_id-1" split into two chunks, we’d want a way of distinguishing the new, smaller "foo.bar-_id-1" chunk from its previous incarnation. Thus, the "t" and "i" fields are the major and minor chunk versions: major versions change when a chunk is migrated to a new shard and minor versions change when a chunk is split.

sh.status() uses the config.chunks collection to gather most of its information.

config.changelog

The changelog collection is useful for keeping track of what a cluster is doing, since it records all of the splits and migrates that have occurred.

Splits are recorded in a document that looks like this:

{ 
    "_id" : "router1-2013-02-09T18:08:12-5116908cab10a03b0cd748c3", 
    "server" : "spock-01", 
    "clientAddr" : "10.3.1.71:62813", 
    "time" : ISODate("2013-02-09T18:08:12.574Z"), 
    "what" : "split", 
    "ns" : "test.foo", 
    "details" : { 
        "before" : { 
            "min" : { "x" : { $minKey : 1 }, "y" : { $minKey : 1 } }, 
            "max" : { "x" : { $maxKey : 1 }, "y" : { $maxKey : 1 } }, 
            "lastmod" : Timestamp(1000, 0), 
            "lastmodEpoch" : ObjectId("000000000000000000000000") 
        }, 
        "left" : { 
            "min" : { "x" : { $minKey : 1 }, "y" : { $minKey : 1 } }, 
            "max" : { "x" : 0, "y" : 10000 }, 
            "lastmod" : Timestamp(1000, 1), 
            "lastmodEpoch" : ObjectId("000000000000000000000000") 
        }, 
        "right" : { 
            "min" : { "x" : 0, "y" : 10000 }, 
            "max" : { "x" : { $maxKey : 1 }, "y" : { $maxKey : 1 } }, 
            "lastmod" : Timestamp(1000, 2), 
            "lastmodEpoch" : ObjectId("000000000000000000000000") 
        } 
    } 
}

The "details" give information about what the original document looked like and what it split into.

This output is what the first chunk split of a collection looks like. Note that each new chunk has its minor version increment: "lastmod" is Timestamp(1000, 1) and Timestamp(1000, 2), respectively.

Migrates are a bit more complicated and actually create four separate changelog documents: one noting the start of the migrate, one for the "from" shard, one for the "to" shard, and one for the migrate’s commit (when the migration is finalized). The middle two documents are of interest because these give a breakdown of how long each step in the process took. This can give you an idea whether it’s the disk, network, or something else that is causing a bottleneck on migrates.

For example, the document created by the "from" shard looks like this:

{ 
     "_id" : "router1-2013-02-09T18:15:14-5116923271b903e42184211c", 
     "server" : "spock-01", 
     "clientAddr" : "10.3.1.71:27017", 
     "time" : ISODate("2013-02-09T18:15:14.388Z"), 
     "what" : "moveChunk.to", 
     "ns" : "test.foo", 
     "details" : { 
         "min" : { "x" : 24123, "y" : -14123 }, 
         "max" : { "x" : 39467, "y" : -29467 }, 
         "step1 of 5" : 0, 
         "step2 of 5" : 0, 
         "step3 of 5" : 900, 
         "step4 of 5" : 0, 
         "step5 of 5" : 142 
     } 
};

Each of the steps listed in "details" is timed and the "stepN of 5" messages show how long the step took, in milliseconds. When the "from" shard receives a moveChunk command from the mongos, it:

  1. Checks the command parameters.

  2. Confirms with the config servers that it can acquire a distributed lock for the migrate.

  3. Tries to contact the "to" shard.

  4. The data is copied. This is referred to and logged as “the critical section.”

  5. Coordinates with the "to" shard and config servers to confirm the migrate.

Note that the "to" and "from" shards must be in close communication starting at "step4 of 5": the shards directly talk to one another and the config server to perform the migration. If the "from" server has flaky network connectivity during the final steps, it may end up in a state where it cannot undo the migrate and cannot move forward with it. In this case, the mongod will shut down.

The "to" shard’s changelog document is similar to the "from" shard’s, but the steps are a bit different. It looks like:

{ 
     "_id" : "router1-2013-02-09T18:15:14-51169232ab10a03b0cd748e5", 
     "server" : "spock-01", 
     "clientAddr" : "10.3.1.71:62813", 
     "time" : ISODate("2013-02-09T18:15:14.391Z"), 
     "what" : "moveChunk.from", 
     "ns" : "test.foo", 
     "details" : { 
          "min" : { "x" : 24123, "y" : -14123 }, 
          "max" : { "x" : 39467, "y" : -29467 }, 
          "step1 of 6" : 0, 
          "step2 of 6" : 2, 
          "step3 of 6" : 33, 
          "step4 of 6" : 1032, 
          "step5 of 6" : 12, 
          "step6 of 6" : 0 
     } 
}

When the "to" shard receives a command from the "from" shard, it:

  1. Migrates indexes. If this shard has never held chunks from the migrated collection before, it needs to know what fields are indexed. If this isn’t the first time a chunk from this collection is being moved to this shard, then this should be a no-op.

  2. Deletes any existing data in the chunk range. There might be data left over from a failed migration or restore procedure which we wouldn’t want to interfere with the current data.

  3. Copy all documents in the chunk to the "to" shard.

  4. Replay any operations that happened to these document during the copy (on the "to" shard).

  5. Wait for the "to" shard to have replicated the newly migrated data to a majority of servers.

  6. Commit the migrate by changing the chunk’s metadata to say that it lives on the "to" shard.

config.tags

This collection is created if you configure shard tags for your system. Each tag is associated with a chunk range:

> db.tags.find()
{
    "_id" : {
        "ns" : "test.ips",
        "min" : {"ip" : "056.000.000.000"}
    },
    "ns" : "test.ips",
    "min" : {"ip" : "056.000.000.000"},
    "max" : {"ip" : "057.000.000.000"},
    "tag" : "USPS"
}
{
    "_id" : {
        "ns" : "test.ips",
        "min" : {"ip" : "017.000.000.000"}
    },
    "ns" : "test.ips",
    "min" : {"ip" : "017.000.000.000"},
    "max" : {"ip" : "018.000.000.000"},
    "tag" : "Apple"
}

config.settings

This collection contains documents representing the current balancer settings and chunk size. By changing the documents in this collection, you can turn the balancer on or off or change the chunk size. Note that you should always connect to mongos, not the config servers directly, to change values in this collection.

Tracking Network Connections

There are a lot of connections between the components of a cluster. This section covers some sharding-specific information. See Chapter 23 for more information on networking.

Getting Connection Statistics

There is a command, connPoolStats, for finding out connection information about mongoses and mongods. This gives you information about how many connections a server has open, and to what:

> db.adminCommand({"connPoolStats" : 1})
{
    "createdByType": {
        "sync": 857,
        "set": 4
    },
    "numDBClientConnection": 35,
    "numAScopedConnection": 0,
    "hosts": {
        "config-01:10005,config-02:10005,config-03:10005": {
            "created": 857,
            "available": 2
        },
        "spock/spock-01:10005,spock-02:10005,spock-03:10005": {
            "created": 4,
            "available": 1
        }
    },
    "totalAvailable": 3,
    "totalCreated": 861,
    "ok": 1
}

Hosts of the form "host1,host2,host3" are config server connections, also known as “sync” connections. Hosts that look like "name/host1,host2,...,hostN" are connections to shards. The "available" counts are how many connections are currently available in the connection pools on this instance.

Note that this command only works on mongos processes and mongods that are members of a shard.

You may see connections to other shards in the output of connPoolStats from a shard, as shards connect to other shards to migrate data. The primary of one shard will connect directly to the primary of another shard and “suck” its data.

When a migrate occurs, a shard sets up a ReplicaSetMonitor (a process that monitors replica set health) to track the health of the shard on the other side of the migrate. mongod never destroys this monitor, so you may see messages in one replica set’s log about the members of another replica set. This is totally normal and should have no effect on your application.

Limiting the Number of Connections

When a client connects to a mongos, mongos creates a connection to at least one shard to pass along the client’s request. Thus, every client connection into a mongos yields at least one outgoing connection from mongos to the shards.

If you have many mongos processes, they may create more connections than your shards can handle: a mongos allows up to 20,000 connections (same as mongod), so if you have 5 mongos processes with 10,000 client connections each, they may be attempting to create 50,000 connections to a shard!

To prevent this, you can use the maxConns option to your command line configuration for mongos to limit the number of connections it can create. The following formula can be used to calculate the maximum number of connections a shard can handle from a single mongos:

maxConns = 20,000 − (numMongosProcesses × 3) − (numMembersPerReplicaSet × 3) − (other / numMongosProcesses)

Breaking down the pieces of this formula:

(numMongosProcesses × 3)

Each mongos creates three connections per mongod: a connection to forward client requests, an error-tracking connection (the writeback listener), and a connection to monitor the replica set’s status.

(numMembersPerReplicaSet × 3)

The primary creates a connection to each secondary and each secondary creates two connections to the primary, for a total of three connections.

(other / numMongosProcesses)

other is the number of miscellaneous processes that may connect to your mongods, such as MMS agents, direct shell connections (for administration), or connections to other shards for migrations.

Note that maxConns only prevents mongos from creating more than this many connections. It doesn’t mean that it does anything particularly helpful when it runs out of connections: it will block requests, waiting for connections to be “freed.” Thus, you must prevent your application from using this many connections, especially as your number of mongos processes grows.

When a MongoDB instance exits cleanly it closes all connections before stopping. The members who were connected to it will immediately get socket errors on those connections and be able to refresh them. However, if a MongoDB instance suddenly goes offline due to a power loss, crash, or network problems, it probably won’t cleanly close all of its sockets. In this case, other servers in the cluster may be under the impression that their connection is healthy until they try to perform an operation on it. At that point, they will get an error and refresh the connection (if the member is up again at that point).

This is a quick process when there are only a few connections. However, when there are thousands of connections that must be refreshed one by one you can get a lot of errors because each connection to the downed member must be tried, determined to be bad, and re-established. There isn’t a particularly good way of preventing this aside from restarting processes that get bogged down in a reconnection storm.

Server Administration

As your cluster grows, you’ll need to add capacity or change configurations. This section covers how to add and remove servers from your cluster.

Adding Servers

You can add new mongos processes at any time. Make sure their --configdb option specifies the correct set of config servers and they should be immediately available for clients to connect to.

To add new shards, use the "addShard" command as show in Chapter 14.

Changing Servers in a Shard

As you use your sharded cluster, you may want to change the servers in individual shards. To change a shard’s membership, connect directly to the shard’s primary (not through the mongos) and issue a replica set reconfig. The cluster configuration will pick up the change and update config.shards automatically. Do not modify config.shards by hand.

The only exception to this is if you started your cluster with standalone servers as shards, not replica sets.

Changing a shard from a standalone server to replica set

The easiest way to do this is to add a new, empty replica set shard and then remove the standalone server shard (see Removing a Shard).

If you wish to turn the standalone server into a replica set the process is fairly complex and involves downtime:

  1. Stop requests to the system.

  2. Shut down the standalone server (call it server-1) and all mongos processes.

  3. Restart the server-1 in replica set mode (with the --replSet option).

  4. Connect to server-1 and initiate the set as a one-member replica set.

  5. Connect to each config server and replace this shard’s entry in config.shards to have a setName/server-1:27017 form for the shard name. Make sure all three config servers have identical information. It is risky to manually edit config servers!

    A good way of ensuring that they are identical is to run the dbhash command on each config server:

    > db.runCommand({"dbhash" : 1})

    This comes up with an MD5 sum for each collection. Some collections in the config database will be different on different config servers, but config.shards should not be.

  6. Restart all mongos processes. They will read the shard data off of the config servers at start up and treat the replica set as a shard.

  7. Restart all shards’ primaries to refresh their config data.

  8. Send requests to the system again.

  9. Add other members to server-1’s set.

This process is complex, error prone, and not recommended. If at all possible, just add a new shard that’s an empty replica set and let migrations take care of moving your data to it.

Removing a Shard

In general, shards should not be removed from a cluster. If you are regularly adding and removing shards, you are putting a lot more stress on the system than necessary. If you add too many shards it is better to let your system grow into it, not remove them and add them back later. However, if necessary, you can remove shards.

First make sure that the balancer is on. The balancer will be tasked with moving all the data on this shard to other shards in a process called draining. To start draining, run the removeShard command. removeShard takes the shard’s name and drains all the chunks on a given shard to the other shards:

> db.adminCommand({"removeShard" : "test-rs3"})
{
    "msg" : "draining started successfully",
    "state" : "started",
    "shard" : "test-rs3",
    "note" : "you need to drop or movePrimary these databases",
    "dbsToMove" : [
        "blog",
        "music",
        "prod"
    ],
    "ok" : 1
}

Draining can take a long time if there are a lot of chunks or large chunks to move. If you have jumbo chunks (see Jumbo Chunks), you may have to temporarily raise the chunk size to allow draining to move them.

If you want to keep tabs on how much has been moved, run removeShard again to give you the current status:

> db.adminCommand({"removeShard" : "test-rs3"})
{
    "msg" : "draining ongoing",
    "state" : "ongoing",
    "remaining" : {
        "chunks" : NumberLong(5),
        "dbs" : NumberLong(0)
    },
    "ok" : 1
}

You can run removeShard as many times as you want.

Chunks may have to split to be moved, so you may see the number of chunks increase in the system during the drain. For example, suppose we have a 5-shard cluster with the following chunk distributions:

test-rs0    10
test-rs1    10
test-rs2    10
test-rs3    11
test-rs4    11

This cluster has a total of 52 chunks. If we remove test-rs3, we might end up with:

test-rs0    15
test-rs1    15
test-rs2    15
test-rs4    15

The cluster now has 60 chunks, 18 of which came from shard test-rs3 (11 were there to start and 7 were created from draining splits).

Once all the chunks have been moved, if there are still databases “homed” on the shard, you’ll need to remove them before the shard can be removed. The output of removeShard will be something like:

> db.adminCommand({"removeShard" : "test-rs3"})
{
    "msg" : "draining ongoing",
    "state" : "ongoing",
    "remaining" : {
        "chunks" : NumberLong(0),
        "dbs" : NumberLong(3)
    },
    "note" : "you need to drop or movePrimary these databases",
    "dbsToMove" : [
        "blog",
        "music",
        "prod"
    ],
    "ok" : 1
}

To finish the remove, move the homed databases with the movePrimary command:

> db.adminCommand({"movePrimary" : "blog", "to" : "test-rs4"})
{
    "primary " : "test-rs4:test-rs4/ubuntu:31500,ubuntu:31501,ubuntu:31502",
    "ok" : 1
}

Once you have moved any databases, run removeShard one more time:

> db.adminCommand({"removeShard" : "test-rs3"})
{
    "msg" : "removeshard completed successfully",
    "state" : "completed",
    "shard" : "test-rs3",
    "ok" : 1
}

This is not strictly necessary, but it confirms that you have completed the process. If there are no databases homed on this shard, you will get this response as soon as all chunks have been migrated off.

Once you have started a shard draining, there is no built-in way to stop it.

Changing Config Servers

Changing anything about your config servers is difficult, dangerous, and generally involves downtime. Before doing any maintenance on config servers, take a backup.

All mongos processes need to have the same value for --configdb whenever they are running. Thus, to change the config servers, you must shut down all your mongos processes, make sure they are all down (no mongos process can still be running with the old --configdb argument), and then restart them with the new --configdb argument.

For example, one of the most common tasks is to move from one config server to three. To accomplish this, shut down your mongos processes, your config server, and all your shards. Copy the data directory of your config servers to the two new config servers (so that there is an identical data directory on all three servers). Now, start up all three config servers and the shards. Then start each of the mongos processes with --configdb pointing to all three config servers.

Balancing Data

In general, MongoDB automatically takes care of balancing data. This section covers how to enable and disable this automatic balancing as well as how to intervene in the balancing process.

The Balancer

Turning off the balancer is a prerequisite to nearly any administrative activity. There is a shell helper to make this easier:

> sh.setBalancerState(false)

With the balancer off a new balancing round will not begin, but it will not force an ongoing balancing round to stop immediately: migrations generally cannot stop on a dime. Thus, you should check the config.locks collection to see whether or not a balancing round is still in progress:

> db.locks.find({"_id" : "balancer"})["state"]
0

0 means the balancer is off. See The Balancer for an explanation of the balancer states.

Balancing puts load on your system: the destination shard must query the source shard for all the documents in a chunk, insert them, and then the source shard must delete them. There are two circumstances in particular where migrations can cause performance problems:

  1. Using a hotspot shard key will force constant migrations (as all new chunks will be created on the hotspot). Your system must have the capacity to handle the flow of data coming off of your hotspot shard.

  2. Adding a new shard will trigger a stream of migrations as the balancer attempts to populate it.

If you find that migrations are affecting your application’s performance, you can schedule a window for balancing in the config.settings collection. Run the following update to only allow balancing between 1 p.m. and 4 p.m.:

> db.settings.update({"_id" : "balancer"}, 
... {"$set" : {"activeWindow" : {"start" : "13:00", "stop" : "16:00"}}}, 
... true )

If you set a balancing window, monitor it closely to ensure that mongos can actually keep your cluster balanced in the time that you have allotted it.

You must be careful if you plan to combine manual balancing with the automatic balancer, since the automatic balancer always determines what to move based on the current state of the set and does not take into account the set’s history. For example, suppose you have shardA and shardB, each holding 500 chunks. shardA is getting a lot of writes, so you turn off the balancer and move 30 of the most active chunks to shardB. If you turn the balancer back on at this point, it will immediately swoop in and move 30 chunks (possibly a different 30) back from shardB to shardA to balance the chunk counts.

To prevent this, move 30 quiescent chunks from shardB to shardA before starting the balancer. That way there will be no imbalance between the shards and the balancer will be happy to leave things as they are. Alternatively, you could perform 30 splits on shardA’s chunks to even out the chunk counts.

Note that the balancer only uses number of chunks as a metric, not size of data. Thus, a shard with a few large chunks may end up as the target of a migration from a shard with many small chunks (but a smaller data size).

Changing Chunk Size

There can be anywhere from zero to millions of documents in a chunk. Generally, the larger a chunk is, the longer it takes to migrate to another shard. In Chapter 13, we used a chunk size of 1 MB, so that we could see chunk movement easily and quickly. This is generally impractical in a live system. MongoDB would be doing a lot of unnecessary work to keep shards within a few megabytes of each other in size. By default, chunks are 64 MB, which is generally a good balance between ease of migration and migratory churn.

Sometimes you may find that migrations are taking too long with 64 MB chunks. To speed them up, you can decrease your chunk size. To do this, connect to mongos through the shell and update the config.settings collection:

> db.settings.findOne()
{
    "_id" : "chunksize", 
    "value" : 64 
}
> db.settings.save({"_id" : "chunksize", "value" : 32})

The previous update would change your chunk size to 32 MB. Existing chunks would not be changed immediately, but as splits occurred chunks would trend toward that size. mongos processes will automatically load the new chunk size value.

Note that this is a cluster-wide setting: it affects all collections and databases. Thus, if you need a small chunk size for one collection and a large chunk size for another, you may have to compromise with a chunk size in between the two ideals (or put the collections in different clusters).

If MongoDB is doing too many migrations or your documents are large, you may want to increase your chunk size.

Moving Chunks

As mentioned earlier, all the data in a chunk lives on a certain shard. If that shard ends up with more chunks than the other shards, MongoDB will move some chunks off it. Moving a chunk is called a migration and is how MongoDB balances data across your cluster.

You can manually move chunks using the moveChunk shell helper:

> sh.moveChunk("test.users", {"user_id" : NumberLong("1844674407370955160")}, 
... "spock") 
{ "millis" : 4079, "ok" : 1 }

This would move the chunk containing the document with "user_id" of 1844674407370955160 to the shard named "spock". You must use the shard key to find which chunk to move ("user_id", in this case). Generally, the easiest way to specify a chunk is by its lower bound, although any value in the chunk will work (the upper bound will not, as it is not actually in the chunk). This command will move the chunk before returning, so it may take a while to run. The logs are the best place to see what it is doing if it takes a long time.

If a chunk is larger than the max chunk size, mongos will refuse to move it:

> sh.moveChunk("test.users", {"user_id" : NumberLong("1844674407370955160")}, 
... "spock")
{
    "cause" : {
        "chunkTooBig" : true,
        "estimatedChunkSize" : 2214960,
        "ok" : 0,
        "errmsg" : "chunk too big to move"
    },
    "ok" : 0,
    "errmsg" : "move failed"
}

In this case, you must manually split the chunk before moving it, using the splitAt command:

> db.chunks.find({"ns" : "test.users", 
... "min.user_id" : NumberLong("1844674407370955160")})
{
    "_id" : "test.users-user_id_NumberLong("1844674407370955160")", 
    "ns" : "test.users", 
    "min" : { "user_id" : NumberLong("1844674407370955160") }, 
    "max" : { "user_id" : NumberLong("2103288923412120952") }, 
    "shard" : "test-rs2" 
}
> sh.splitAt("test.ips", {"user_id" : NumberLong("2000000000000000000")})
{ "ok" : 1 }
> db.chunks.find({"ns" : "test.users", 
... "min.user_id" : {"$gt" : NumberLong("1844674407370955160")},
... "max.user_id" : {"$lt" : NumberLong("2103288923412120952")}})
{ 
    "_id" : "test.users-user_id_NumberLong("1844674407370955160")", 
    "ns" : "test.users", 
    "min" : { "user_id" : NumberLong("1844674407370955160") }, 
    "max" : { "user_id" : NumberLong("2000000000000000000") }, 
    "shard" : "test-rs2" 
}
{ 
    "_id" : "test.users-user_id_NumberLong("2000000000000000000")", 
    "ns" : "test.users", 
    "min" : { "user_id" : NumberLong("2000000000000000000") }, 
    "max" : { "user_id" : NumberLong("2103288923412120952") }, 
    "shard" : "test-rs2" 
}

Once the chunk has been split into smaller pieces, it should be movable. Alternatively, you can raise the max chunk size and then move it, but you should break up large chunks whenever possible. Sometimes, though, they cannot be broken up: these are called jumbo chunks.

Jumbo Chunks

Suppose you choose the "date" field as your shard key. The "date" field in this collection is a string that looks like "year/month/day", which means that mongos can create at most one chunk per day. This works fine for a while, until your application suddenly goes viral and gets a thousand times its typical traffic for one day.

This day’s chunk is going to be much larger than any other day’s, but it is also completely unsplittable because every document has the same value for the shard key.

Once a chunk is larger than the max chunk size set in config.settings, the balancer will not be allowed to move the chunk. These unsplittable, unmovable chunks are called jumbo chunks and they are inconvenient to deal with.

Let’s take an example. Suppose there are three shards, shard1, shard2, and shard3. If you use the hotspot shard key pattern described in Ascending Shard Keys, all your writes will be going to one shard, say shard1. mongos will try to balance the number of chunks evenly between the shards. But the only chunks that the balancer can move are the non-jumbo chunks, so it will migrate all the small chunks off the hotspot shard.

Now all the shards will have roughly the same number of chunks, but all of shard2 and shard3’s chunks will be less than 64 MB in size. And if jumbo chunks are being created, more and more of shard1’s chunks will be more than 64 MB in size. Thus, shard1 will fill up a lot faster than the other two shards, even though the number of chunks is perfectly balanced between the three.

Thus, one of the indicators that you have jumbo chunk problems is that one shard’s size is growing much faster than the others. You can also look at sh.status() to see if you have jumbo chunks: they will be marked with a "jumbo" attribute:

> sh.status()
...
    { "x" : -7 } -->> { "x" : 5 } on : shard0001 
    { "x" : 5 } -->> { "x" : 6 } on : shard0001 jumbo
    { "x" : 6 } -->> { "x" : 7 } on : shard0001 jumbo
    { "x" : 7 } -->> { "x" : 339 } on : shard0001
...

You can use the dataSize command to check chunk sizes.

First, we use the config.chunks collection to find the chunk ranges:

> use config
> var chunks = db.chunks.find({"ns" : "acme.analytics"}).toArray()

Use these chunk ranges to find possible jumbo chunks:

> use dbName
> db.runCommand({"dataSize" : "dbName.collName",
... "keyPattern" : {"date" : 1}, // shard key
... "min" : chunks[0].min, 
... "max" : chunks[0].max})
{ "size" : 11270888, "numObjects" : 128081, "millis" : 100, "ok" : 1 }

Be careful—the dataSize command does have to scan the chunk’s data to figure out how big it is. If you can, narrow down your search by using your knowledge of your data: were jumbo chunks created on certain date? For example, if there was a really busy day on November 1, look for chunks with that day in their shard key range. If you’re using GridFS and sharding by "files_id", you can look at the fs.files collection to find a file’s size.

Distributing jumbo chunks

To fix a cluster thrown off-balance by jumbo chunks, you must evenly distribute them among the shards.

This is a complex manual process, but should not cause any downtime (it may cause slowness, as you’ll be migrating a lot of data). In the description below, the shard with the jumbo chunks is referred to as the “from” shard. The shards that the jumbo chunks are migrated to are called the “to” shards. Note that you may have multiple “from” shards that you wish to move chunks off of. Repeat these steps for each:

  1. Turn off the balancer. You don’t want to the balancer trying to “help” during this process:

    > sh.setBalancerState(false)
  2. MongoDB will not allow you to move chunks larger than the max chunk size, so temporarily raise the chunk size. Make a note of what your original chunk size is and then change it to something large, like 10,000. Chunk size is specified in megabytes:

    > use config
    > db.settings.findOne({"_id" : "chunksize"})
    {
        "_id" : "chunksize", 
        "value" : 64
    }
    > db.settings.save({"_id" : "chunksize", "value" : 10000})
  3. Use the moveChunk command to move jumbo chunks off the “from” shard. If you are concerned about the impact migrations will have on your application’s performance, use the secondaryThrottle option to prevent them from happening too quickly:

    > db.adminCommand({"moveChunk" : "acme.analytics", 
    ... "find" : {"date" : new Date("10/23/2012")}, 
    ... "to" : "shard0002", 
    ... "secondaryThrottle" : true})

    secondaryThrottle forces migrations to periodically wait until a majority of secondaries have replicated the migration. It only works if you are running with shards that are replica sets (not standalone servers).

  4. Run splitChunk on the remaining chunks on the donor shard until it has a roughly even number of chunks as the other shards.

  5. Set chunk size back to its original value:

    > db.settings.save({"_id" : "chunksize", "value" : 64})
  6. Turn on the balancer:

    > sh.setBalancerState(true)

When the balancer is turned on again it cannot move the jumbo chunks again, as they are essentially held in place by their size.

Preventing jumbo chunks

As the amount of data you are storing grows, the manual process described in the previous section becomes unsustainable. Thus, if you’re having problems with jumbo chunks, you should make it a priority to prevent them from forming.

To prevent jumbo chunks, modify your shard key to have more granularity. You want almost every document to have a unique value for the shard key, or at least never have more than chunksize-worth of data with a single shard key value.

For example, if you were using the year/month/day key described earlier it can quickly be made finer-grained by adding hours, minutes, and seconds. Similarly, if you’re sharding on something coarsely-grained key like log level, add a second field to your shard key with a lot of granularity, such as an MD5 hash or UUID. Then you can always split a chunk, even if the first field is the same for many documents.

Refreshing Configurations

As a final tip, sometimes mongos will not update its configuration correctly from the config servers. If you ever get a configuration that you don’t expect or a mongos seems to be out of date or cannot find data that you know is there, use the flushRouterConfig command to manually clear all caches:

> db.adminCommand({"flushRouterConfig" : 1})

If flushRouterConfig does not work, restarting all your mongos or mongod processes clears any possible cache.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.33.41