What is Sharding?
Sharding is a process in MongoDB, that divides large data sets into smaller volumes of data and distributes them across multiple MongoDB instances. When the data within MongoDB is huge, the queries run against such large data sets can lead to high CPU usage. MongoDB leverages the feature of Sharding to tackle this. During this process, the large database is split into shards (subsets of data) that logically function as a collection. Shards are implemented through clusters that consist of:
- Shard(s): They are MongoDB instances that host subsets of data. In production, all shards need to be a part of replica sets that maintain the same data sets. Replica sets provide redundancy, high availability and are the basis for all production deployments.
- Config server: It is a mongod instance that holds metadata about various mongoDB instances. A config server runs small mongod processes. In a production environment, usually, there are 3 config servers, wherein each server consists of a copy of metadata, that are kept in sync. And as long as one config server is alive, the cluster will remain active.
- Query router: This is a mongoDB instance that is responsible for re-directing the client’s commands to the right servers. It doesn’t have a persistent state. It acts as the interface between the client application and the relevant shard. The query router gathers information that is needed to answer a query, reduces false positives and false negatives in the data, and routes the query to the most accurate information sources.
In a two-part series about sharding in MongoDB, we explain:
- How to create a sharded cluster
- How to select a shard key
In the first part we provide a tutorial on how to create a sharded cluster, explaining all the basics of sharding.
Sharded Cluster Tutorial
To create a cluster, we need to be clear about:
- The number of shards required initially
- Replication factor and other parameters for the replica set
mongod and mongos are two different processes in MongoDB. While mongod is the primary daemon that handles data requests, data access and performs background management operations, mongos is a shard utility that processes and routes user queries and determines the location of data in the sharded cluster. The shards and config servers run the mongod process, whereas the query router server runs the mongos process.
For our illustration, let’s take 4 shards (namely a, b, c, and d) and a replication factor of 3. So in total there will be 12 shard servers (mongod processes). We’ll also run 4 query routers (mongos processes) and 3 config servers (small mongod processes) as one would in a production environment; running all of this in one server simulating a cluster.
In actuality, these shards along with their replica sets are run on 12 different machines. However, config servers are lightweight processes and with only 4 shards. So, the load will be considerably low. This means you can run config servers on any one of the shard servers. The mongos processes can be run either on a shard server or directly on a client application machine.
The benefit of running mongos on a client application machine is that it runs on a local interface without having to go outside of the network. However, you should remember to enable the right security options. On the other hand, if you run it on the shard server or any other server, the client will not have to go through the process of setting the cluster up.
-
Data directories
The first step in the process is to create data directories for our shard servers, config servers, and logs.
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/a0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/a1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/a2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/b0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/b1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/b2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/c0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/c1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/c2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/d0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/d1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/d2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p config/cfg0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p config/cfg1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p config/cfg2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir logs
-
Config server and Shard server
Use the mongod command, as shown below, to start the config server. We have, thus, passed the configsvr parameter that declares the config server for that cluster.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --configsvr --replSet cfg --dbpath config/cfg0 --port 26050 --fork --logpath logs/log.cfg0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --configsvr --replSet cfg --dbpath config/cfg1 --port 26051 --fork --logpath logs/log.cfg1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --configsvr --replSet cfg --dbpath config/cfg2 --port 26052 --fork --logpath logs/log.cfg2 --logappend
Now let’s get on with our shard servers. Here, we run the mongod command to pass the shardsvr parameter which in turn declares that it’s part of a sharded cluster. We then rename the replica set with the name of the shard.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet a --dbpath data/a0 --port 27000 --fork --logpath logs/log.a0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet a --dbpath data/a1 --port 27001 --fork --logpath logs/log.a1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet a --dbpath data/a2 --port 27002 --fork --logpath logs/log.a2 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet b --dbpath data/b0 --port 27100 --fork --logpath logs/log.b0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet b --dbpath data/b1 --port 27101 --fork --logpath logs/log.b1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet b --dbpath data/b2 --port 27102 --fork --logpath logs/log.b2 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet c --dbpath data/c0 --port 27200 --fork --logpath logs/log.b0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet c --dbpath data/c1 --port 27201 --fork --logpath logs/log.b1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet c --dbpath data/c2 --port 27202 --fork --logpath logs/log.b2 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet d --dbpath data/d0 --port 27300 --fork --logpath logs/log.c0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet d --dbpath data/d1 --port 27301 --fork --logpath logs/log.c1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet d --dbpath data/d2 --port 27302 --fork --logpath logs/log.c2 --logappend
-
Mongos Server
In the next step, we run the mongos command to start mongos processes. In order to tell where the config servers are, we pass config servers IP using the –configdb parameter. We’ll run a single mongos process on a standard mongo TCP port to adhere to best practices.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos1 --port 26061 --bind_ip 0.0.0.0 --fork --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos2 --port 26062 --bind_ip 0.0.0.0 --fork --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos3 --port 26063 --bind_ip 0.0.0.0 --fork --logappend
Note that we never run the config server and the sharded server on the same standard mongo TCP port.
Now, check if all the processes are running smoothly.
Tue Oct 27 piyushgoyal mongo_tutorial $ ps -A | grep mongo
26745 ?? 0:00.95 mongod --configsvr --replSet cfg --dbpath config/cfg0 --port 26050 --fork --logpath logs/log.cfg0 --logappend
26748 ?? 0:00.90 mongod --configsvr --replSet cfg --dbpath config/cfg1 --port 26051 --fork --logpath logs/log.cfg1 --logappend
26754 ?? 0:00.90 mongod --configsvr --replSet cfg --dbpath config/cfg2 --port 26052 --fork --logpath logs/log.cfg2 --logappend
26757 ?? 0:00.77 mongod --shardsvr --replSet a --dbpath data/a0 --port 27000 --fork --logpath logs/log.a0 --logappend
26760 ?? 0:00.77 mongod --shardsvr --replSet a --dbpath data/a1 --port 27001 --fork --logpath logs/log.a1 --logappend
26763 ?? 0:00.76 mongod --shardsvr --replSet a --dbpath data/a2 --port 27002 --fork --logpath logs/log.a2 --logappend
26766 ?? 0:00.76 mongod --shardsvr --replSet b --dbpath data/b0 --port 27100 --fork --logpath logs/log.b0 --logappend
26769 ?? 0:00.77 mongod --shardsvr --replSet b --dbpath data/b1 --port 27101 --fork --logpath logs/log.b1 --logappend
26772 ?? 0:00.75 mongod --shardsvr --replSet b --dbpath data/b2 --port 27102 --fork --logpath logs/log.b2 --logappend
26775 ?? 0:00.73 mongod --shardsvr --replSet c --dbpath data/c0 --port 27200 --fork --logpath logs/log.b0 --logappend
26784 ?? 0:00.75 mongod --shardsvr --replSet c --dbpath data/c1 --port 27201 --fork --logpath logs/log.b1 --logappend
26791 ?? 0:00.74 mongod --shardsvr --replSet c --dbpath data/c2 --port 27202 --fork --logpath logs/log.b2 --logappend
26794 ?? 0:00.77 mongod --shardsvr --replSet d --dbpath data/d0 --port 27300 --fork --logpath logs/log.c0 --logappend
26797 ?? 0:00.75 mongod --shardsvr --replSet d --dbpath data/d1 --port 27301 --fork --logpath logs/log.c1 --logappend
26800 ?? 0:00.71 mongod --shardsvr --replSet d --dbpath data/d2 --port 27302 --fork --logpath logs/log.c2 --logappend
26803 ?? 0:00.00 mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
26804 ?? 0:00.24 mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
76826 ?? 8:58.30 /usr/local/opt/mongodb-community/bin/mongod --config /usr/local/etc/mongod.conf
26817 ttys009 0:00.00 grep mongo
26801 ttys016 0:00.01 mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
-
Status and Initiation of Replica Sets
Connect to one of the shard servers. For instance, let’s connect to 0 shard
Tue Oct 27 piyushgoyal mongo_tutorial $ mongo --port 27000
Run the following command to check the status of the replica set
> rs.status()
{
"operationTime" : Timestamp(0, 0),
"ok" : 0,
"errmsg" : "no replset config has been received",
"code" : 94,
"codeName" : "NotYetInitialized",
"$clusterTime" : {
"clusterTime" : Timestamp(0, 0),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
What follows is a dialogue box that says “run rs.initiate() is not yet done for the set”. To initiate the replica set, run the following command:
> rs.initiate()
{
"info2" : "no configuration specified. Using a default configuration for the set",
"me" : "localhost:27000",
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1604637446, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1604637446, 1)
}
If you run rs.status() once again, you’ll see that a single member has been added. Now, let’s add others as well to this replica set.
a:PRIMARY> rs.add("127.0.0.1:27001")
{
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1604637486, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1604637486, 1)
}
a:PRIMARY> rs.add("127.0.0.1:27002")
{
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1604637491, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1604637491, 1)
}
Run rs.status() to check the status of the initiation. Once it has been initiated, repeat the same process for other shards and the config server.
-
Add Shards
Connect to the mongos process so as to add shards to the cluster.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongo --port 27017
Run the command below to add shards:
mongos> sh.addShard("a/127.0.0.1:27000")
{
"shardAdded" : "a",
"ok" : 1,
"operationTime" : Timestamp(1604637907, 8),
"$clusterTime" : {
"clusterTime" : Timestamp(1604637907, 8),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.addShard("b/127.0.0.1:27100")
{
"shardAdded" : "b",
"ok" : 1,
"operationTime" : Timestamp(1604638045, 6),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638045, 6),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.addShard("c/127.0.0.1:27200")
{
"shardAdded" : "c",
"ok" : 1,
"operationTime" : Timestamp(1604638065, 4),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638065, 4),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.addShard("d/127.0.0.1:27300")
{
"shardAdded" : "d",
"ok" : 1,
"operationTime" : Timestamp(1604638086, 6),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638086, 6),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
Note: When you run sh.addShard(‘a/127.0.0.1:27000’), if you get an output as shown below, try running sh.addShard(‘a/127.0.0.1:27001’) or sh.addShard(‘a/127.0.0.1:27002’).
{
"ok" : 0,
"errmsg" : "in seed list a/127.0.0.1:27000, host 127.0.0.1:27000 does not belong to replica set a; found { hosts: [ \"localhost:27000\", \"127.0.0.1:27001\", \"127.0.0.1:27002\" ], setName: \"a\", setVersion: 3, ismaster: true, secondary: false, primary: \"localhost:27000\", me: \"localhost:27000\", electionId: ObjectId('7fffffff0000000000000001'), lastWrite: { opTime: { ts: Timestamp(1604637886, 1), t: 1 }, lastWriteDate: new Date(1604637886000), majorityOpTime: { ts: Timestamp(1604637886, 1), t: 1 }, majorityWriteDate: new Date(1604637886000) }, maxBsonObjectSize: 16777216, maxMessageSizeBytes: 48000000, maxWriteBatchSize: 100000, localTime: new Date(1604637894239), logicalSessionTimeoutMinutes: 30, connectionId: 21, minWireVersion: 0, maxWireVersion: 8, readOnly: false, compression: [ \"snappy\", \"zstd\", \"zlib\" ], ok: 1.0, $clusterTime: { clusterTime: Timestamp(1604637886, 1), signature: { hash: BinData(0, 0000000000000000000000000000000000000000), keyId: 0 } }, operationTime: Timestamp(1604637886, 1) }",
"code" : 96,
"codeName" : "OperationFailed",
"operationTime" : Timestamp(1604637888, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1604637888, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
Sharding a collection
Run sh.status() to check if the shards have been added.
Now that we have set up the cluster, let’s test it.
When you shard a collection, you should choose a specific shard key. Without which sharding is impossible. It helps to distribute documents among members of the cluster.
The data stored by the key is called a chunk. The chunk size is only allowed to be between 1 and 1024 megabytes.
For example, let’s assume a collection of the following documents:
{
"_id": ObjectId("5f97d97eb7a0a940f157ebc8"),
"x": 1
}
And we use key x to shard the collection. The data is distributed as shown below:
x_low x_high shard
0 1000 S2 ----> chunk
1000 2000 S0 ----> chunk
2000 3000 S1 ----> chunk
The collection’s documents are distributed based on the shard key assigned. Documents of a particular shard key belong to the same shard.
The process of sharding can be divided into two:
- Range-based sharding
- Hashed sharding
To keep this tutorial simple, we prefer range-based sharding. Range-based sharding is helpful for queries involving a range. Against the backdrop of these processes, the system also carries out these two key operations:
- Split
- Migration (it is been handled by cluster balancer)
For example, if a chunk size exceeds the prescribed size, the system, on its own, will attempt to find out the appropriate median key to divide the chunk into 2 or more parts. This is known as split, which is an inexpensive, metadata change. However, migration is responsible for maintaining the balance, for instance, moving a chunk from one shard to another. This is an expensive operation which usually involves transferring data worth ~1024MB.
You can still read and write to the database while the migration is in process. There’s a liveliness property to these migrations which keeps the database alive for operations, so a big lock doesn’t occur.
For our example, we’re creating a database named tutorial with collection foo and are going to insert some documents.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongo --port 27017
mongos> use tutorial
mongos> for(var i=0; i<999999; i++) { db.foo.insert({x:i}) }
To allow sharding for the database, we have to manually enable it. Connect to the mongo instance and run sh.enableSharding(“<dbname>”).
mongos> sh.enableSharding("tutorial")
{
"ok" : 1,
"operationTime" : Timestamp(1604638168, 21),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638168, 21),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
If you run sh.status() in the database, it returns “partitioned = True” in the tutorial database.
Now we shard our collection using key x. To shard a collection, we have to first create an index on the key and then run sh.shardCollection(“<db_name>.<collection_name>”, <key>).
mongos> db.foo.ensureIndex({x: 1})
{
"raw" : {
"b/127.0.0.1:27101,127.0.0.1:27102,localhost:27100" : {
"createdCollectionAutomatically" : false,
"numIndexesBefore" : 1,
"numIndexesAfter" : 2,
"ok" : 1
}
},
"ok" : 1,
"operationTime" : Timestamp(1604638185, 9),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638185, 9),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.shardCollection("tutorial.foo", {x: 1})
{
"collectionsharded" : "tutorial.foo",
"collectionUUID" : UUID("b6506a90-dc0f-48d2-ba22-c15bbc94c0d6"),
"ok" : 1,
"operationTime" : Timestamp(1604638203, 39),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638203, 39),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
The collection is now sharded. To check the shard distribution, number of chunks (nchunks), run the following command:
mongos> use tutorial
mongos> db.foo.stats()
To know the distribution of chunks across all databases, run the command below:
mongos> sh.status()
Let’s run some queries.
mongos> use tutorial
mongos> db.foo.find({x: 55})
To view the query plan or and understand how the cluster queries through different servers to get the document, append .explain() at the end of the query.
mongos> db.foo.find({x: 55}).explain()
{
"queryPlanner" : {
"mongosPlannerVersion" : 1,
"winningPlan" : {
"stage" : "SINGLE_SHARD",
"shards" : [
{
"shardName" : "b",
"connectionString" : "b/127.0.0.1:27101,127.0.0.1:27102,localhost:27100",
"serverInfo" : {
"host" : "Piyushs-MacBook-Pro.local",
"port" : 27100,
"version" : "4.2.8",
"gitVersion" : "43d25964249164d76d5e04dd6cf38f6111e21f5f"
},
"plannerVersion" : 1,
"namespace" : "tutorial.foo",
"indexFilterSet" : false,
"parsedQuery" : {
"x" : {
"$eq" : 55
}
},
"queryHash" : "716F281A",
"planCacheKey" : "0FA0E5FD",
"winningPlan" : {
"stage" : "FETCH",
"inputStage" : {
"stage" : "SHARDING_FILTER",
"inputStage" : {
"stage" : "IXSCAN",
"keyPattern" : {
"x" : 1
},
"indexName" : "x_1",
"isMultiKey" : false,
"multiKeyPaths" : {
"x" : [ ]
},
"isUnique" : false,
"isSparse" : false,
"isPartial" : false,
"indexVersion" : 2,
"direction" : "forward",
"indexBounds" : {
"x" : [
"[55.0, 55.0]"
]
}
}
}
},
"rejectedPlans" : [ ]
}
]
}
},
"serverInfo" : {
"host" : "Piyushs-MacBook-Pro.local",
"port" : 27017,
"version" : "4.2.8",
"gitVersion" : "43d25964249164d76d5e04dd6cf38f6111e21f5f"
},
"ok" : 1,
"operationTime" : Timestamp(1604638250, 30),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638255, 27),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
In the second part of this two-part series, we will explore various methods by which we can select a shard key.