Chapter 17. Sharding Administration

As with replica sets, you have a number of options for administering sharded clusters. Manual administration is one option. These days it is becoming increasingly common to use tools such as Ops Manager and Cloud Manager and the Atlas Database-as-a-Service (DBaaS) offering for all cluster administration. In this chapter, we will demonstrate how to administer a sharded cluster manually, including:

  • Inspecting the cluster’s state: who its members are, where data is held, and what connections are open

  • Adding, removing, and changing members of a cluster

  • Administering data movement and manually moving data

Seeing the Current State

There are several helpers available to find out what data is where, what the shards are, and what the cluster is doing.

Getting a Summary with sh.status()

sh.status() gives you an overview of your shards, databases, and sharded collections. If you have a small number of chunks, it will print a breakdown of which chunks are where as well. Otherwise it will simply give the collection’s shard key and report how many chunks each shard has:

> sh.status()
--- Sharding Status --- 
sharding version: {
  "_id" : 1,
  "minCompatibleVersion" : 5,
  "currentVersion" : 6,
  "clusterId" : ObjectId("5bdf51ecf8c192ed922f3160")
}
shards:
  {  "_id" : "shard01",  
     "host" : "shard01/localhost:27018,localhost:27019,localhost:27020",  
     "state" : 1 }
  {  "_id" : "shard02",  
     "host" : "shard02/localhost:27021,localhost:27022,localhost:27023",  
     "state" : 1 }
  {  "_id" : "shard03",  
     "host" : "shard03/localhost:27024,localhost:27025,localhost:27026",  
     "state" : 1 }
active mongoses:
  "4.0.3" : 1
autosplit:
  Currently enabled: yes
balancer:
  Currently enabled:  yes
  Currently running:  no
  Failed balancer rounds in last 5 attempts:  0
  Migration Results for the last 24 hours: 
    6 : Success
  databases:
    {  "_id" : "config",  "primary" : "config",  "partitioned" : true }
      config.system.sessions
        shard key: { "_id" : 1 }
        unique: false
        balancing: true
        chunks:
          shard01	1
          { "_id" : { "$minKey" : 1 } } -->> 
          { "_id" : { "$maxKey" : 1 } } on : shard01 Timestamp(1, 0) 
    {  "_id" : "video",  "primary" : "shard02",  "partitioned" : true,
       "version" : 
          {  "uuid" : UUID("3d83d8b8-9260-4a6f-8d28-c3732d40d961"),  
             "lastMod" : 1 } }
       video.movies
         shard key: { "imdbId" : "hashed" }
         unique: false
         balancing: true
         chunks:
           shard01 3
           shard02 4
           shard03 3
           { "imdbId" : { "$minKey" : 1 } } -->> 
               { "imdbId" : NumberLong("-7262221363006655132") } on : 
               shard01 Timestamp(2, 0) 
           { "imdbId" : NumberLong("-7262221363006655132") } -->> 
               { "imdbId" : NumberLong("-5315530662268120007") } on : 
               shard03 Timestamp(3, 0) 
           { "imdbId" : NumberLong("-5315530662268120007") } -->> 
               { "imdbId" : NumberLong("-3362204802044524341") } on : 
               shard03 Timestamp(4, 0) 
           { "imdbId" : NumberLong("-3362204802044524341") } -->> 
               { "imdbId" : NumberLong("-1412311662519947087") } 
               on : shard01 Timestamp(5, 0) 
           { "imdbId" : NumberLong("-1412311662519947087") } -->> 
               { "imdbId" : NumberLong("524277486033652998") } on : 
               shard01 Timestamp(6, 0) 
           { "imdbId" : NumberLong("524277486033652998") } -->> 
               { "imdbId" : NumberLong("2484315172280977547") } on : 
               shard03 Timestamp(7, 0) 
           { "imdbId" : NumberLong("2484315172280977547") } -->> 
               { "imdbId" : NumberLong("4436141279217488250") } on : 
               shard02 Timestamp(7, 1) 
           { "imdbId" : NumberLong("4436141279217488250") } -->> 
               { "imdbId" : NumberLong("6386258634539951337") } on : 
               shard02 Timestamp(1, 7) 
           { "imdbId" : NumberLong("6386258634539951337") } -->> 
               { "imdbId" : NumberLong("8345072417171006784") } on : 
               shard02 Timestamp(1, 8) 
           { "imdbId" : NumberLong("8345072417171006784") } -->> 
               { "imdbId" : { "$maxKey" : 1 } } on : 
               shard02 Timestamp(1, 9)

Once there are more than a few chunks, sh.status() will summarize the chunk stats instead of printing each chunk. To see all chunks, run sh.status(true) (the true tells sh.status() to be verbose).

All the information sh.status() shows is gathered from your config database.

Seeing Configuration Information

All of the configuration information about your cluster is kept in collections in the config database on the config servers. The shell has several helpers for exposing this information in a more readable way. However, you can always directly query the config database for metadata about your cluster.

Warning

Never connect directly to your config servers, as you do not want to take the chance of accidentally changing or removing config server data. Instead, connect to the mongos process and use the config database to see its data, as you would for any other database:

> use config

If you manipulate config data through mongos (instead of connecting directly to the config servers), mongos will ensure that all of your config servers stay in sync and prevent various dangerous actions like accidentally dropping the config database.

In general, you should not directly change any data in the config database (exceptions are noted in the following sections). If you change anything, you will generally have to restart all of your mongos servers to see its effect.

There are several collections in the config database. This section covers what each one contains and how it can be used.

config.shards

The shards collection keeps track of all the shards in the cluster. A typical document in the shards collection might look something like this:

> db.shards.find()
{ "_id" : "shard01", 
  "host" : "shard01/localhost:27018,localhost:27019,localhost:27020", 
  "state" : 1 }
{ "_id" : "shard02", 
  "host" : "shard02/localhost:27021,localhost:27022,localhost:27023", 
  "state" : 1 }
{ "_id" : "shard03", 
  "host" : "shard03/localhost:27024,localhost:27025,localhost:27026", 
  "state" : 1 }

The shard’s "_id" is picked up from the replica set name, so each replica set in your cluster must have a unique name.

When you update your replica set configuration (e.g., adding or removing members), the "host" field will be updated automatically.

config.databases

The databases collection keeps track of all of the databases, sharded and not, that the cluster knows about:

> db.databases.find()
{ "_id" : "video", "primary" : "shard02", "partitioned" : true, 
  "version" : { "uuid" : UUID("3d83d8b8-9260-4a6f-8d28-c3732d40d961"), 
  "lastMod" : 1 } }

If enableSharding has been run on a database, "partitioned" will be true. The "primary" is the database’s “home base.” By default, all new collections in that database will be created on the database’s primary shard.

config.collections

The collections collection keeps track of all sharded collections (nonsharded collections are not shown). A typical document looks something like this:

> db.collections.find().pretty()
{
    "_id" : "config.system.sessions",
    "lastmodEpoch" : ObjectId("5bdf53122ad9c6907510c22d"),
    "lastmod" : ISODate("1970-02-19T17:02:47.296Z"),
    "dropped" : false,
    "key" : {
        "_id" : 1
    },
    "unique" : false,
    "uuid" : UUID("7584e4cd-fac4-4305-a9d4-bd73e93621bf")
}
{
    "_id" : "video.movies",
    "lastmodEpoch" : ObjectId("5bdf72c021b6e3be02fabe0c"),
    "lastmod" : ISODate("1970-02-19T17:02:47.305Z"),
    "dropped" : false,
    "key" : {
        "imdbId" : "hashed"
    },
    "unique" : false,
    "uuid" : UUID("e6580ffa-fcd3-418f-aa1a-0dfb71bc1c41")
}

The important fields are:

"_id"

The namespace of the collection.

"key"

The shard key. In this case, it is a hashed shard key on "imdbId".

"unique"

Indicates that the shard key is not a unique index. By default, the shard key is not unique.

config.chunks

The chunks collection keeps a record of each chunk in all the collections. A typical document in the chunks collection looks something like this:

> db.chunks.find().skip(1).limit(1).pretty()
{
    "_id" : "video.movies-imdbId_MinKey",
    "lastmod" : Timestamp(2, 0),
    "lastmodEpoch" : ObjectId("5bdf72c021b6e3be02fabe0c"),
    "ns" : "video.movies",
    "min" : {
        "imdbId" : { "$minKey" : 1 }
    },
    "max" : {
        "imdbId" : NumberLong("-7262221363006655132")
    },
    "shard" : "shard01",
    "history" : [
        {
            "validAfter" : Timestamp(1541370579, 3096),
            "shard" : "shard01"
        }
    ]
}

The most useful fields are:

"_id"

The unique identifier for the chunk. Generally this is the namespace, shard key, and lower chunk boundary.

"ns"

The collection that this chunk is from.

"min"

The smallest value in the chunk’s range (inclusive).

"max"

All values in the chunk are smaller than this value.

"shard"

Which shard the chunk resides on.

The "lastmod" field tracks chunk versioning. For example, if the chunk "video.movies-imdbId_MinKey" were split into two chunks, we’d want a way of distinguishing the new, smaller "video.movies-imdbId_MinKey" chunks from their previous incarnation as a single chunk. Thus, the first component of the Timestamp value reflects the number of times a chunk has been migrated to a new shard. The second component of this value reflects the number of splits. The "lastmodEpoch" field specifies the collection’s creation epoch. It is used to differentiate requests for the same collection name in the cases where the collection was dropped and immediately recreated.

sh.status() uses the config.chunks collection to gather most of its information.

config.changelog

The changelog collection is useful for keeping track of what a cluster is doing, since it records all of the splits and migrations that have occurred.

Splits are recorded in a document that looks like this:

> db.changelog.find({what: "split"}).pretty()
{
    "_id" : "router1-2018-11-05T09:58:58.915-0500-5be05ab2f8c192ed922ffbe7",
    "server" : "bob",
    "clientAddr" : "127.0.0.1:64621",
    "time" : ISODate("2018-11-05T14:58:58.915Z"),
    "what" : "split",
    "ns" : "video.movies",
    "details" : {
        "before" : {
            "min" : {
                "imdbId" : NumberLong("2484315172280977547")
            },
            "max" : {
                "imdbId" : NumberLong("4436141279217488250")
            },
            "lastmod" : Timestamp(9, 1),
            "lastmodEpoch" : ObjectId("5bdf72c021b6e3be02fabe0c")
        },
        "left" : {
            "min" : {
                "imdbId" : NumberLong("2484315172280977547")
            },
            "max" : {
                "imdbId" : NumberLong("3459137475094092005")
            },
            "lastmod" : Timestamp(9, 2),
            "lastmodEpoch" : ObjectId("5bdf72c021b6e3be02fabe0c")
        },
        "right" : {
            "min" : {
                "imdbId" : NumberLong("3459137475094092005")
            },
            "max" : {
                "imdbId" : NumberLong("4436141279217488250")
            },
            "lastmod" : Timestamp(9, 3),
            "lastmodEpoch" : ObjectId("5bdf72c021b6e3be02fabe0c")
        }
    }
}

The "details" field gives information about what the original document looked like and what it was split into.

This output shows what the first chunk split of a collection looks like. Note that the second component of "lastmod" for each new chunk was updated so that the values are Timestamp(9, 2) and Timestamp(9, 3), respectively.

Migrations are a bit more complicated and actually create four separate changelog documents: one noting the start of the migrate, one for the “from” shard, one for the “to” shard, and one for the commit that occurs when the migration is finalized. The middle two documents are of interest because these give a breakdown of how long each step in the process took. This can give you an idea of whether it’s the disk, network, or something else that is causing a bottleneck on migrates.

For example, the document created by the “from” shard looks like this:

> db.changelog.findOne({what: "moveChunk.to"})
{
    "_id" : "router1-2018-11-04T17:29:39.702-0500-5bdf72d32ad9c69075112f08",
    "server" : "bob",
    "clientAddr" : "",
    "time" : ISODate("2018-11-04T22:29:39.702Z"),
    "what" : "moveChunk.to",
    "ns" : "video.movies",
    "details" : {
        "min" : {
            "imdbId" : { "$minKey" : 1 }
        },
        "max" : {
            "imdbId" : NumberLong("-7262221363006655132")
        },
        "step 1 of 6" : 965,
        "step 2 of 6" : 608,
        "step 3 of 6" : 15424,
        "step 4 of 6" : 0,
        "step 5 of 6" : 72,
        "step 6 of 6" : 258,
        "note" : "success"
    }
}

Each of the steps listed in "details" is timed and the "stepN of N" messages show how long each step took, in milliseconds.

When the “from” shard receives a moveChunk command from the mongos, it:

  1. Checks the command parameters.

  2. Confirms with the config servers that it can acquire a distributed lock for the migrate.

  3. Tries to contact the “to” shard.

  4. Copies the data. This is referred to and logged as “the critical section.”

  5. Coordinates with the “to” shard and config servers to confirm the migration.

Note that the “to” and “from” shards must be in close communication starting at "step4 of 6": the shards directly talk to one another and the config server to perform the migration. If the “from” server has flaky network connectivity during the final steps, it may end up in a state where it cannot undo the migration and cannot move forward with it. In this case, the mongod will shut down.

The “to” shard’s changelog document is similar to the “from” shard’s, but the steps are a bit different. It looks like this:

> db.changelog.find({what: "moveChunk.from", "details.max.imdbId": 
  NumberLong("-7262221363006655132")}).pretty()
{
    "_id" : "router1-2018-11-04T17:29:39.753-0500-5bdf72d321b6e3be02fabf0b",
    "server" : "bob",
    "clientAddr" : "127.0.0.1:64743",
    "time" : ISODate("2018-11-04T22:29:39.753Z"),
    "what" : "moveChunk.from",
    "ns" : "video.movies",
    "details" : {
        "min" : {
            "imdbId" : { "$minKey" : 1 }
        },
        "max" : {
            "imdbId" : NumberLong("-7262221363006655132")
        },
        "step 1 of 6" : 0,
        "step 2 of 6" : 4,
        "step 3 of 6" : 191,
        "step 4 of 6" : 17000,
        "step 5 of 6" : 341,
        "step 6 of 6" : 39,
        "to" : "shard01",
        "from" : "shard02",
        "note" : "success"
    }
}

When the “to” shard receives a command from the “from” shard, it:

  1. Migrates indexes. If this shard has never held chunks from the migrated collection before, it needs to know what fields are indexed. If this isn’t the first time a chunk from this collection is being moved to this shard, then this should be a no-op.

  2. Deletes any existing data in the chunk range. There might be data left over from a failed migration or restore procedure that we wouldn’t want to interfere with the current data.

  3. Copies all documents in the chunk to the “to” shard.

  4. Replays any operations that happened to these documents during the copy (on the “to” shard).

  5. Waits for the “to” shard to have replicated the newly migrated data to a majority of servers.

  6. Commits the migrate by changing the chunk’s metadata to say that it lives on the “to” shard.

config.settings

This collection contains documents representing the current balancer settings and chunk size. By changing the documents in this collection, you can turn the balancer on or off or change the chunk size. Note that you should always connect to mongos, not the config servers directly, to change values in this collection.

Tracking Network Connections

There are a lot of connections between the components of a cluster. This section covers some sharding-specific information (see Chapter 24 for more information on networking).

Getting Connection Statistics

The command connPoolStats returns information regarding the open outgoing connections from the current database instance to other members of the sharded cluster or replica set.

To avoid interference with any running operations, connPoolStats does not take any locks. As such, the counts may change slightly as connPoolStats gathers information, resulting in slight differences between the hosts and pools connection counts:

> db.adminCommand({"connPoolStats": 1})
{
    "numClientConnections" : 10,
    "numAScopedConnections" : 0,
    "totalInUse" : 0,
    "totalAvailable" : 13,
    "totalCreated" : 86,
    "totalRefreshing" : 0,
    "pools" : {
        "NetworkInterfaceTL-TaskExecutorPool-0" : {
            "poolInUse" : 0,
            "poolAvailable" : 2,
            "poolCreated" : 2,
            "poolRefreshing" : 0,
            "localhost:27027" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 1,
                "refreshing" : 0
            },
            "localhost:27019" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 1,
                "refreshing" : 0
            }
        },
        "NetworkInterfaceTL-ShardRegistry" : {
            "poolInUse" : 0,
            "poolAvailable" : 1,
            "poolCreated" : 13,
            "poolRefreshing" : 0,
            "localhost:27027" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 13,
                "refreshing" : 0
            }
        },
        "global" : {
            "poolInUse" : 0,
            "poolAvailable" : 10,
            "poolCreated" : 71,
            "poolRefreshing" : 0,
            "localhost:27026" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 8,
                "refreshing" : 0
            },
            "localhost:27027" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 1,
                "refreshing" : 0
            },
            "localhost:27023" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 7,
                "refreshing" : 0
            },
            "localhost:27024" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 6,
                "refreshing" : 0
            },
            "localhost:27022" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 9,
                "refreshing" : 0
            },
            "localhost:27019" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 8,
                "refreshing" : 0
            },
            "localhost:27021" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 8,
                "refreshing" : 0
            },
            "localhost:27025" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 9,
                "refreshing" : 0
            },
            "localhost:27020" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 8,
                "refreshing" : 0
            },
            "localhost:27018" : {
                "inUse" : 0,
                "available" : 1,
                "created" : 7,
                "refreshing" : 0
            }
        }
    },
    "hosts" : {
        "localhost:27026" : {
            "inUse" : 0,
            "available" : 1,
            "created" : 8,
            "refreshing" : 0
        },
        "localhost:27027" : {
            "inUse" : 0,
            "available" : 3,
            "created" : 15,
            "refreshing" : 0
        },
        "localhost:27023" : {
            "inUse" : 0,
            "available" : 1,
            "created" : 7,
            "refreshing" : 0
        },
        "localhost:27024" : {
            "inUse" : 0,
            "available" : 1,
            "created" : 6,
            "refreshing" : 0
        },
        "localhost:27022" : {
            "inUse" : 0,
            "available" : 1,
            "created" : 9,
            "refreshing" : 0
        },
        "localhost:27019" : {
            "inUse" : 0,
            "available" : 2,
            "created" : 9,
            "refreshing" : 0
        },
        "localhost:27021" : {
            "inUse" : 0,
            "available" : 1,
            "created" : 8,
            "refreshing" : 0
        },
        "localhost:27025" : {
            "inUse" : 0,
            "available" : 1,
            "created" : 9,
            "refreshing" : 0
        },
        "localhost:27020" : {
            "inUse" : 0,
            "available" : 1,
            "created" : 8,
            "refreshing" : 0
        },
        "localhost:27018" : {
            "inUse" : 0,
            "available" : 1,
            "created" : 7,
            "refreshing" : 0
        }
    },
    "replicaSets" : {
        "shard02" : {
            "hosts" : [
                {
                    "addr" : "localhost:27021",
                    "ok" : true,
                    "ismaster" : true,
                    "hidden" : false,
                    "secondary" : false,
                    "pingTimeMillis" : 0
                },
                {
                    "addr" : "localhost:27022",
                    "ok" : true,
                    "ismaster" : false,
                    "hidden" : false,
                    "secondary" : true,
                    "pingTimeMillis" : 0
                },
                {
                    "addr" : "localhost:27023",
                    "ok" : true,
                    "ismaster" : false,
                    "hidden" : false,
                    "secondary" : true,
                    "pingTimeMillis" : 0
                }
            ]
        },
        "shard03" : {
            "hosts" : [
                {
                    "addr" : "localhost:27024",
                    "ok" : true,
                    "ismaster" : false,
                    "hidden" : false,
                    "secondary" : true,
                    "pingTimeMillis" : 0
                },
                {
                    "addr" : "localhost:27025",
                    "ok" : true,
                    "ismaster" : true,
                    "hidden" : false,
                    "secondary" : false,
                    "pingTimeMillis" : 0
                },
                {
                    "addr" : "localhost:27026",
                    "ok" : true,
                    "ismaster" : false,
                    "hidden" : false,
                    "secondary" : true,
                    "pingTimeMillis" : 0
                }
            ]
        },
        "configRepl" : {
            "hosts" : [
                {
                    "addr" : "localhost:27027",
                    "ok" : true,
                    "ismaster" : true,
                    "hidden" : false,
                    "secondary" : false,
                    "pingTimeMillis" : 0
                }
            ]
        },
        "shard01" : {
            "hosts" : [
                {
                    "addr" : "localhost:27018",
                    "ok" : true,
                    "ismaster" : false,
                    "hidden" : false,
                    "secondary" : true,
                    "pingTimeMillis" : 0
                },
                {
                    "addr" : "localhost:27019",
                    "ok" : true,
                    "ismaster" : true,
                    "hidden" : false,
                    "secondary" : false,
                    "pingTimeMillis" : 0
                },
                {
                    "addr" : "localhost:27020",
                    "ok" : true,
                    "ismaster" : false,
                    "hidden" : false,
                    "secondary" : true,
                    "pingTimeMillis" : 0
                }
            ]
        }
    },
    "ok" : 1,
    "operationTime" : Timestamp(1541440424, 1),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1541440424, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

In this output:

  • "totalAvailable" shows the total number of available outgoing connections from the current mongod/mongos instance to other members of the sharded cluster or replica set.

  • "totalCreated" reports the total number of outgoing connections ever created by the current mongod/mongos instance to other members of the sharded cluster or replica set.

  • "totalInUse" provides the total number of outgoing connections from the current mongod/mongos instance to other members of the sharded cluster or replica set that are currently in use.

  • "totalRefreshing" displays the total number of outgoing connections from the current mongod/mongos instance to other members of the sharded cluster or replica set that are currently being refreshed.

  • "numClientConnections" identifies the number of active and stored outgoing synchronous connections from the current mongod/mongos instance to other members of the sharded cluster or replica set. These represent a subset of the connections reported by "totalAvailable", "totalCreated", and "totalInUse".

  • "numAScopedConnection" reports the number of active and stored outgoing scoped synchronous connections from the current mongod/mongos instance to other members of the sharded cluster or replica set. These represent a subset of the connections reported by "totalAvailable", "totalCreated", and "totalInUse".

  • "pools" shows connection statistics (in use/available/created/refreshing) grouped by the connection pools. A mongod or mongos has two distinct families of outgoing connection pools:

    • DBClient-based pools (the “write path,” identified by the field name "global" in the "pools" document)

    • NetworkInterfaceTL-based pools (the “read path”)

  • "hosts" shows connection statistics (in use/available/created/refreshing) grouped by the hosts. It reports on connections between the current mongod/mongos instance and each member of the sharded cluster or replica set.

You might see connections to other shards in the output of connPoolStats. These indicate that shards are connecting to other shards to migrate data. The primary of one shard will connect directly to the primary of another shard and “suck” its data.

When a migrate occurs, a shard sets up a ReplicaSetMonitor (a process that monitors replica set health) to track the health of the shard on the other side of the migrate. mongod never destroys this monitor, so you may see messages in one replica set’s log about the members of another replica set. This is totally normal and should have no effect on your application.

Limiting the Number of Connections

When a client connects to a mongos, the mongos creates a connection to at least one shard to pass along the client’s request. Thus, every client connection into a mongos yields at least one outgoing connection from mongos to the shards.

If you have many mongos processes, they may create more connections than your shards can handle: by default a mongos will accept up to 65,536 connections (the same as mongod), so if you have 5 mongos processes with 10,000 client connections each, they may be attempting to create 50,000 connections to a shard!

To prevent this, you can use the --maxConns option to your command-line configuration for mongos to limit the number of connections it can create. The following formula can be used to calculate the maximum number of connections a shard can handle from a single mongos:

  • maxConns = maxConnsPrimary − (numMembersPerReplicaSet × 3) −
  • (other x 3) / numMongosProcesses

Breaking down the pieces of this formula:

maxConnsPrimary

The maximum number of connections on the Primary, typically set to 20,000 to avoid overwhelming the shard with connections from the mongos.

(numMembersPerReplicaSet × 3)

The primary creates a connection to each secondary and each secondary creates two connections to the primary, for a total of three connections.

(other x 3)

Other is the number of miscellaneous processes that may connect to your mongods, such as monitoring or backup agents, direct shell connections (for administration), or connections to other shards for migrations.

numMongosProcesses

The total number of mongos in the sharded cluster.

Note that --maxConns only prevents mongos from creating more than this many connections. It doesn’t do anything particularly helpful when this limit is reached: it will simply block requests, waiting for connections to be “freed.” Thus, you must prevent your application from using this many connections, especially as your number of mongos processes grows.

When a MongoDB instance exits cleanly it closes all connections before stopping. The members that were connected to it will immediately get socket errors on those connections and be able to refresh them. However, if a MongoDB instance suddenly goes offline due to a power loss, crash, or network problems, it probably won’t cleanly close all of its sockets. In this case, other servers in the cluster may be under the impression that their connection is healthy until they try to perform an operation on it. At that point, they will get an error and refresh the connection (if the member is up again at that point).

This is a quick process when there are only a few connections. However, when there are thousands of connections that must be refreshed one by one you can get a lot of errors because each connection to the downed member must be tried, determined to be bad, and reestablished. There isn’t a particularly good way of preventing this, aside from restarting processes that get bogged down in a reconnection storm.

Server Administration

As your cluster grows, you’ll need to add capacity or change configurations. This section covers how to add and remove servers in your cluster.

Adding Servers

You can add new mongos processes at any time. Make sure their --configdb option specifies the correct set of config servers and they should be immediately available for clients to connect to.

To add new shards, use the addShard command as shown in Chapter 15.

Changing Servers in a Shard

As you use your sharded cluster, you may want to change the servers in individual shards. To change a shard’s membership, connect directly to the shard’s primary (not through the mongos) and issue a replica set reconfig. The cluster configuration will pick up the change and update config.shards automatically. Do not modify config.shards by hand.

The only exception to this is if you started your cluster with standalone servers as shards, not replica sets.

Changing a shard from a standalone server to a replica set

The easiest way to do this is to add a new, empty replica set shard and then remove the standalone server shard (as discussed in the next section). Migrations will take care of moving your data to the new shard.

Removing a Shard

In general, shards should not be removed from a cluster. If you are regularly adding and removing shards, you are putting a lot more stress on the system than necessary. If you add too many shards it is better to let your system grow into them, not remove them and add them back later. However, if necessary, you can remove shards.

First make sure that the balancer is on. The balancer will be tasked with moving all the data on the shard you want to remove to other shards in a process called draining. To start draining, run the removeShard command. removeShard takes the shard’s name and drains all the chunks on that shard to the other shards:

> db.adminCommand({"removeShard" : "shard03"})
{
    "msg" : "draining started successfully",
    "state" : "started",
    "shard" : "shard03",
    "note" : "you need to drop or movePrimary these databases",
    "dbsToMove" : [ ],
    "ok" : 1,
    "operationTime" : Timestamp(1541450091, 2),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1541450091, 2),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

Draining can take a long time if there are a lot of chunks or large chunks to move. If you have jumbo chunks (see “Jumbo Chunks”), you may have to temporarily increase the chunk size to allow draining to move them.

If you want to keep tabs on how much has been moved, run removeShard again to give you the current status:

> db.adminCommand({"removeShard" : "shard02"})
{
    "msg" : "draining ongoing",
    "state" : "ongoing",
    "remaining" : {
        "chunks" : NumberLong(3),
        "dbs" : NumberLong(0)
    },
    "note" : "you need to drop or movePrimary these databases",
    "dbsToMove" : [ 
          "video"
       ],
    "ok" : 1,
    "operationTime" : Timestamp(1541450139, 1),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1541450139, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

You can run removeShard as many times as you want.

Chunks may have to be split to be moved, so you may see the number of chunks increase in the system during the drain. For example, suppose we have a five-shard cluster with the following chunk distributions:

test-rs0    10
test-rs1    10
test-rs2    10
test-rs3    11
test-rs4    11

This cluster has a total of 52 chunks. If we remove test-rs3, we might end up with:

test-rs0    15
test-rs1    15
test-rs2    15
test-rs4    15

The cluster now has 60 chunks, 18 of which came from shard test-rs3 (11 were there to start and 7 were created from draining splits).

Once all the chunks have been moved, if there are still databases that have the removed shard as their primary, you’ll need to remove them before the shard can be removed. Each database in a sharded cluster has a primary shard. If the shard you want to remove is also the primary of one of the cluster’s databases, removeShard lists the database in the "dbsToMove" field. To finish removing the shard, you must either move the database to a new shard after migrating all data from the shard or drop the database, deleting the associated data files. The output of removeShard will be something like:

> db.adminCommand({"removeShard" : "shard02"})
{
    "msg" : "draining ongoing",
    "state" : "ongoing",
    "remaining" : {
        "chunks" : NumberLong(3),
        "dbs" : NumberLong(0)
    },
    "note" : "you need to drop or movePrimary these databases",
    "dbsToMove" : [ 
          "video"
       ],
    "ok" : 1,
    "operationTime" : Timestamp(1541450139, 1),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1541450139, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

To finish the remove, move the listed databases with the movePrimary command:

> db.adminCommand({"movePrimary" : "video", "to" : "shard01"})
{
    "ok" : 1,
    "operationTime" : Timestamp(1541450554, 12),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1541450554, 12),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

Once you have done this, run removeShard one more time:

> db.adminCommand({"removeShard" : "shard02"})
{
    "msg" : "removeshard completed successfully",
    "state" : "completed",
    "shard" : "shard03",
    "ok" : 1,
    "operationTime" : Timestamp(1541450619, 2),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1541450619, 2),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

This is not strictly necessary, but it confirms that you have completed the process. If there are no databases that have this shard as their primary, you will get this response as soon as all chunks have been migrated off the shard.

Warning

Once you have started a shard draining, there is no built-in way to stop it.

Balancing Data

In general, MongoDB automatically takes care of balancing data. This section covers how to enable and disable this automatic balancing as well as how to intervene in the balancing process.

The Balancer

Turning off the balancer is a prerequisite to nearly any administrative activity. There is a shell helper to make this easier:

> sh.setBalancerState(false)
{
    "ok" : 1,
    "operationTime" : Timestamp(1541450923, 2),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1541450923, 2),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

With the balancer off a new balancing round will not begin, but turning it off will not force an ongoing balancing round to stop immediately—migrations generally cannot stop on a dime. Thus, you should check the config.locks collection to see whether or not a balancing round is still in progress:

> db.locks.find({"_id" : "balancer"})["state"]
0

0 means the balancer is off.

Balancing puts load on your system: the destination shard must query the source shard for all the documents in a chunk and insert them, and then the source shard must delete them. There are two circumstances in particular where migrations can cause performance problems:

  1. Using a hotspot shard key will force constant migrations (as all new chunks will be created on the hotspot). Your system must have the capacity to handle the flow of data coming off of your hotspot shard.

  2. Adding a new shard will trigger a stream of migrations as the balancer attempts to populate it.

If you find that migrations are affecting your application’s performance, you can schedule a window for balancing in the config.settings collection. Run the following update to only allow balancing between 1 p.m. and 4 p.m. First make sure the balancer is on, then schedule the window:

> sh.setBalancerState( true )
{
    "ok" : 1,
    "operationTime" : Timestamp(1541451846, 4),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1541451846, 4),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}
> db.settings.update(
   { _id: "balancer" },
   { $set: { activeWindow : { start : "13:00", stop : "16:00" } } },
   { upsert: true }
)
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

If you set a balancing window, monitor it closely to ensure that mongos can actually keep your cluster balanced in the time that you have allotted it.

You must be careful if you plan to combine manual balancing with the automatic balancer, since the automatic balancer always determines what to move based on the current state of the set and does not take into account the set’s history. For example, suppose you have shardA and shardB, each holding 500 chunks. shardA is getting a lot of writes, so you turn off the balancer and move 30 of the most active chunks to shardB. If you turn the balancer back on at this point, it will immediately swoop in and move 30 chunks (possibly a different 30) back from shardB to shardA to balance the chunk counts.

To prevent this, move 30 quiescent chunks from shardB to shardA before starting the balancer. That way there will be no imbalance between the shards and the balancer will be happy to leave things as they are. Alternatively, you could perform 30 splits on shardA’s chunks to even out the chunk counts.

Note that the balancer only uses number of chunks as a metric, not size of data. Moving a chunk is called a migration and is how MongoDB balances data across your cluster. Thus, a shard with a few large chunks may end up as the target of a migration from a shard with many small chunks (but a smaller data size).

Changing Chunk Size

There can be anywhere from zero to millions of documents in a chunk. Generally, the larger a chunk is, the longer it takes to migrate to another shard. In Chapter 14, we used a chunk size of 1 MB, so that we could see chunk movement easily and quickly. This is generally impractical in a live system; MongoDB would be doing a lot of unnecessary work to keep shards within a few megabytes of each other in size. By default chunks are 64 MB, which generally provides a good balance between ease of migration and migratory churn.

Sometimes you may find that migrations are taking too long with 64 MB chunks. To speed them up, you can decrease your chunk size. To do this, connect to mongos through the shell and update the config.settings collection:

> db.settings.findOne()
{
    "_id" : "chunksize", 
    "value" : 64 
}
> db.settings.save({"_id" : "chunksize", "value" : 32})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

The previous update would change your chunk size to 32 MB. Existing chunks would not be changed immediately, however; automatic splitting only occurs on insert or update. Thus, if you lower the chunk size, it may take time for all chunks to split to the new size.

Splits cannot be undone. If you increase the chunk size, existing chunks grow only through insertion or updates until they reach the new size. The allowed range of the chunk size is between 1 and 1,024 MB, inclusive.

Note that this is a cluster-wide setting: it affects all collections and databases. Thus, if you need a small chunk size for one collection and a large chunk size for another, you may have to compromise with a chunk size in between the two ideals (or put the collections in different clusters).

Tip

If MongoDB is doing too many migrations or your documents are large, you may want to increase your chunk size.

Moving Chunks

As mentioned earlier, all the data in a chunk lives on a certain shard. If that shard ends up with more chunks than the other shards, MongoDB will move some chunks off it.

You can manually move chunks using the moveChunk shell helper:

> sh.moveChunk("video.movies", {imdbId: 500000}, "shard02") 
{ "millis" : 4079, "ok" : 1 }

This would move the chunk containing the document with an "imdbId" of 500000 to the shard named shard02. You must use the shard key ("imdbId", in this case) to find which chunk to move. Generally, the easiest way to specify a chunk is by its lower bound, although any value in the chunk will work (the upper bound will not, as it is not actually in the chunk). This command will move the chunk before returning, so it may take a while to run. The logs are the best place to see what it is doing if it takes a long time.

If a chunk is larger than the max chunk size, mongos will refuse to move it:

> sh.moveChunk("video.movies", {imdbId: NumberLong("8345072417171006784")}, 
  "shard02")
{
    "cause" : {
        "chunkTooBig" : true,
        "estimatedChunkSize" : 2214960,
        "ok" : 0,
        "errmsg" : "chunk too big to move"
    },
    "ok" : 0,
    "errmsg" : "move failed"
}

In this case, you must manually split the chunk before moving it, using the splitAt command:

> db.chunks.find({ns: "video.movies", "min.imdbId": 
  NumberLong("6386258634539951337")}).pretty()
{
    "_id" : "video.movies-imdbId_6386258634539951337",
    "ns" : "video.movies",
    "min" : {
        "imdbId" : NumberLong("6386258634539951337")
    },
    "max" : {
        "imdbId" : NumberLong("8345072417171006784")
    },
    "shard" : "shard02",
    "lastmod" : Timestamp(1, 9),
    "lastmodEpoch" : ObjectId("5bdf72c021b6e3be02fabe0c"),
    "history" : [
        {
            "validAfter" : Timestamp(1541370559, 4),
            "shard" : "shard02"
        }
    ]
}
> sh.splitAt("video.movies", {"imdbId": 
  NumberLong("7000000000000000000")})
{
    "ok" : 1,
    "operationTime" : Timestamp(1541453304, 1),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1541453306, 5),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}
> db.chunks.find({ns: "video.movies", "min.imdbId": 
  NumberLong("6386258634539951337")}).pretty()
{
    "_id" : "video.movies-imdbId_6386258634539951337",
    "lastmod" : Timestamp(15, 2),
    "lastmodEpoch" : ObjectId("5bdf72c021b6e3be02fabe0c"),
    "ns" : "video.movies",
    "min" : {
        "imdbId" : NumberLong("6386258634539951337")
    },
    "max" : {
        "imdbId" : NumberLong("7000000000000000000")
    },
    "shard" : "shard02",
    "history" : [
        {
            "validAfter" : Timestamp(1541370559, 4),
            "shard" : "shard02"
        }
    ]
}

Once the chunk has been split into smaller pieces, it should be movable. Alternatively, you can raise the max chunk size and then move it, but you should break up large chunks whenever possible. Sometimes, though, chunks cannot be broken up—we’ll look at this situation next.1

Jumbo Chunks

Suppose you choose the "date" field as your shard key. The "date" field in this collection is a string that looks like "year/month/day", which means that mongos can create at most one chunk per day. This works fine for a while, until your application suddenly goes viral and gets a thousand times its typical traffic for one day.

This day’s chunk is going to be much larger than any other day’s, but it is also completely unsplittable because every document has the same value for the shard key.

Once a chunk is larger than the max chunk size set in config.settings, the balancer will not be allowed to move the chunk. These unsplittable, unmovable chunks are called jumbo chunks and they are inconvenient to deal with.

Let’s take an example. Suppose you have three shards, shard1, shard2, and shard3. If you use the hotspot shard key pattern described in “Ascending Shard Keys”, all your writes will be going to one shard—say, shard1. The shard primary mongod will request that the balancer move each new top chunk evenly between the other shards, but the only chunks that the balancer can move are the nonjumbo chunks, so it will migrate all the small chunks off the hot shard.

Now all the shards will have roughly the same number of chunks, but all of shard2 and shard3’s chunks will be less than 64 MB in size. And if jumbo chunks are being created, more and more of shard1’s chunks will be more than 64 MB in size. Thus, shard1 will fill up a lot faster than the other two shards, even though the number of chunks is perfectly balanced between the three.

Thus, one of the indicators that you have jumbo chunk problems is that one shard’s size is growing much faster than the others. You can also look at the output of sh.status() to see if you have jumbo chunks—they will be marked with the jumbo attribute:

> sh.status()
...
    { "x" : -7 } -->> { "x" : 5 } on : shard0001 
    { "x" : 5 } -->> { "x" : 6 } on : shard0001 jumbo
    { "x" : 6 } -->> { "x" : 7 } on : shard0001 jumbo
    { "x" : 7 } -->> { "x" : 339 } on : shard0001
...

You can use the dataSize command to check chunk sizes. First, use the config.chunks collection to find the chunk ranges:

> use config
> var chunks = db.chunks.find({"ns" : "acme.analytics"}).toArray()

Then use these chunk ranges to find possible jumbo chunks:

> use <dbName>
> db.runCommand({"dataSize" : "<dbName.collName>",
... "keyPattern" : {"date" : 1}, // shard key
... "min" : chunks[0].min, 
... "max" : chunks[0].max})
{
    "size" : 33567917,
    "numObjects" : 108942,
    "millis" : 634,
    "ok" : 1,
    "operationTime" : Timestamp(1541455552, 10),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1541455552, 10),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

Be careful, though—the dataSize command does have to scan the chunk’s data to figure out how big it is. If you can, narrow down your search by using your knowledge of your data: were jumbo chunks created on a certain date? For example, if July 1 was a really busy day, look for chunks with that day in their shard key range.

Tip

If you’re using GridFS and sharding by "files_id", you can look at the fs.files collection to find a file’s size.

Distributing jumbo chunks

To fix a cluster thrown off-balance by jumbo chunks, you must evenly distribute them among the shards.

This is a complex manual process, but should not cause any downtime (it may cause slowness, as you’ll be migrating a lot of data). In the following description, the shard with the jumbo chunks is referred to as the “from” shard. The shards that the jumbo chunks are migrated to are called the “to” shards. Note that you may have multiple “from” shards that you wish to move chunks off of. Repeat these steps for each:

  1. Turn off the balancer. You don’t want the balancer trying to “help” during this process:

    > sh.setBalancerState(false)
  2. MongoDB will not allow you to move chunks larger than the max chunk size, so temporarily increase the chunk size. Make a note of what your original chunk size is and then change it to something large, like 10000. Chunk size is specified in megabytes:

    > use config
    > db.settings.findOne({"_id" : "chunksize"})
    {
        "_id" : "chunksize", 
        "value" : 64
    }
    > db.settings.save({"_id" : "chunksize", "value" : 10000})
  3. Use the moveChunk command to move jumbo chunks off the “from” shard.

  4. Run splitChunk on the remaining chunks on the “from” shard until it has roughly the same number of chunks as the “to” shards.

  5. Set the chunk size back to its original value:

    > db.settings.save({"_id" : "chunksize", "value" : 64})
  6. Turn on the balancer:

    > sh.setBalancerState(true)

When the balancer is turned on again, it will once again be unable to move the jumbo chunks; they are essentially held in place by their size.

Preventing jumbo chunks

As the amount of data you are storing grows, the manual process described in the previous section becomes unsustainable. Thus, if you’re having problems with jumbo chunks, you should make it a priority to prevent them from forming.

To prevent jumbo chunks, modify your shard key to have more granularity. You want almost every document to have a unique value for the shard key, or at least to never have more than the chunk size’s worth of data with a single shard key value.

For example, if you were using the year/month/day key described earlier, it could quickly be made more fine-grained by adding hours, minutes, and seconds. Similarly, if you’re sharding on something coarse-grained like log level, you can add to your shard key a second field with a lot of granularity, such as an MD5 hash or UUID. Then you can always split a chunk, even if the first field is the same for many documents.

Refreshing Configurations

As a final tip, sometimes mongos will not update its configuration correctly from the config servers. If you ever get a configuration that you don’t expect or a mongos seems to be out of date or cannot find data that you know is there, use the flushRouterConfig command to manually clear all caches:

> db.adminCommand({"flushRouterConfig" : 1})

If flushRouterConfig does not work, restarting all your mongos or mongod processes clears any cached data.

1 MongoDB 4.4 is planning to add a new parameter (forceJumbo) in the moveChunk function, as well as a new balancer configuration setting attemptToBalanceJumboChunks to address jumbo chunks. The details are in this JIRA ticket describing the work.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.31.240