🚀 CloudSEK has raised $19M Series B1 Round – Powering the Future of Predictive Cybersecurity
Read More
Protect your organization from external threats like data leaks, brand threats, dark web originated threats and more. Schedule a demo today!
Schedule a DemoSharding is a process in MongoDB, that divides large data sets into smaller volumes of data and distributes them across multiple MongoDB instances. When the data within MongoDB is huge, the queries run against such large data sets can lead to high CPU usage. MongoDB leverages the feature of Sharding to tackle this. During this process, the large database is split into shards (subsets of data) that logically function as a collection. Shards are implemented through clusters that consist of:
In a two-part series about sharding in MongoDB, we explain:
In the first part we provide a tutorial on how to create a sharded cluster, explaining all the basics of sharding.
To create a cluster, we need to be clear about:
mongod and mongos are two different processes in MongoDB. While mongod is the primary daemon that handles data requests, data access and performs background management operations, mongos is a shard utility that processes and routes user queries and determines the location of data in the sharded cluster. The shards and config servers run the mongod process, whereas the query router server runs the mongos process.
For our illustration, let’s take 4 shards (namely a, b, c, and d) and a replication factor of 3. So in total there will be 12 shard servers (mongod processes). We’ll also run 4 query routers (mongos processes) and 3 config servers (small mongod processes) as one would in a production environment; running all of this in one server simulating a cluster.
In actuality, these shards along with their replica sets are run on 12 different machines. However, config servers are lightweight processes and with only 4 shards. So, the load will be considerably low. This means you can run config servers on any one of the shard servers. The mongos processes can be run either on a shard server or directly on a client application machine.
The benefit of running mongos on a client application machine is that it runs on a local interface without having to go outside of the network. However, you should remember to enable the right security options. On the other hand, if you run it on the shard server or any other server, the client will not have to go through the process of setting the cluster up.
The first step in the process is to create data directories for our shard servers, config servers, and logs.
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/a0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/a1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/a2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/b0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/b1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/b2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/c0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/c1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/c2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/d0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/d1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/d2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p config/cfg0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p config/cfg1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p config/cfg2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir logs
Use the mongod command, as shown below, to start the config server. We have, thus, passed the configsvr parameter that declares the config server for that cluster.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --configsvr --replSet cfg --dbpath config/cfg0 --port 26050 --fork --logpath logs/log.cfg0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --configsvr --replSet cfg --dbpath config/cfg1 --port 26051 --fork --logpath logs/log.cfg1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --configsvr --replSet cfg --dbpath config/cfg2 --port 26052 --fork --logpath logs/log.cfg2 --logappend
Now let’s get on with our shard servers. Here, we run the mongod command to pass the shardsvr parameter which in turn declares that it’s part of a sharded cluster. We then rename the replica set with the name of the shard.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet a --dbpath data/a0 --port 27000 --fork --logpath logs/log.a0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet a --dbpath data/a1 --port 27001 --fork --logpath logs/log.a1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet a --dbpath data/a2 --port 27002 --fork --logpath logs/log.a2 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet b --dbpath data/b0 --port 27100 --fork --logpath logs/log.b0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet b --dbpath data/b1 --port 27101 --fork --logpath logs/log.b1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet b --dbpath data/b2 --port 27102 --fork --logpath logs/log.b2 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet c --dbpath data/c0 --port 27200 --fork --logpath logs/log.b0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet c --dbpath data/c1 --port 27201 --fork --logpath logs/log.b1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet c --dbpath data/c2 --port 27202 --fork --logpath logs/log.b2 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet d --dbpath data/d0 --port 27300 --fork --logpath logs/log.c0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet d --dbpath data/d1 --port 27301 --fork --logpath logs/log.c1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet d --dbpath data/d2 --port 27302 --fork --logpath logs/log.c2 --logappend
In the next step, we run the mongos command to start mongos processes. In order to tell where the config servers are, we pass config servers IP using the –configdb parameter. We’ll run a single mongos process on a standard mongo TCP port to adhere to best practices.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos1 --port 26061 --bind_ip 0.0.0.0 --fork --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos2 --port 26062 --bind_ip 0.0.0.0 --fork --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos3 --port 26063 --bind_ip 0.0.0.0 --fork --logappend
Note that we never run the config server and the sharded server on the same standard mongo TCP port.
Now, check if all the processes are running smoothly.
Tue Oct 27 piyushgoyal mongo_tutorial $ ps -A | grep mongo
26745 ?? 0:00.95 mongod --configsvr --replSet cfg --dbpath config/cfg0 --port 26050 --fork --logpath logs/log.cfg0 --logappend
26748 ?? 0:00.90 mongod --configsvr --replSet cfg --dbpath config/cfg1 --port 26051 --fork --logpath logs/log.cfg1 --logappend
26754 ?? 0:00.90 mongod --configsvr --replSet cfg --dbpath config/cfg2 --port 26052 --fork --logpath logs/log.cfg2 --logappend
26757 ?? 0:00.77 mongod --shardsvr --replSet a --dbpath data/a0 --port 27000 --fork --logpath logs/log.a0 --logappend
26760 ?? 0:00.77 mongod --shardsvr --replSet a --dbpath data/a1 --port 27001 --fork --logpath logs/log.a1 --logappend
26763 ?? 0:00.76 mongod --shardsvr --replSet a --dbpath data/a2 --port 27002 --fork --logpath logs/log.a2 --logappend
26766 ?? 0:00.76 mongod --shardsvr --replSet b --dbpath data/b0 --port 27100 --fork --logpath logs/log.b0 --logappend
26769 ?? 0:00.77 mongod --shardsvr --replSet b --dbpath data/b1 --port 27101 --fork --logpath logs/log.b1 --logappend
26772 ?? 0:00.75 mongod --shardsvr --replSet b --dbpath data/b2 --port 27102 --fork --logpath logs/log.b2 --logappend
26775 ?? 0:00.73 mongod --shardsvr --replSet c --dbpath data/c0 --port 27200 --fork --logpath logs/log.b0 --logappend
26784 ?? 0:00.75 mongod --shardsvr --replSet c --dbpath data/c1 --port 27201 --fork --logpath logs/log.b1 --logappend
26791 ?? 0:00.74 mongod --shardsvr --replSet c --dbpath data/c2 --port 27202 --fork --logpath logs/log.b2 --logappend
26794 ?? 0:00.77 mongod --shardsvr --replSet d --dbpath data/d0 --port 27300 --fork --logpath logs/log.c0 --logappend
26797 ?? 0:00.75 mongod --shardsvr --replSet d --dbpath data/d1 --port 27301 --fork --logpath logs/log.c1 --logappend
26800 ?? 0:00.71 mongod --shardsvr --replSet d --dbpath data/d2 --port 27302 --fork --logpath logs/log.c2 --logappend
26803 ?? 0:00.00 mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
26804 ?? 0:00.24 mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
76826 ?? 8:58.30 /usr/local/opt/mongodb-community/bin/mongod --config /usr/local/etc/mongod.conf
26817 ttys009 0:00.00 grep mongo
26801 ttys016 0:00.01 mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
Connect to one of the shard servers. For instance, let’s connect to 0 shard
Tue Oct 27 piyushgoyal mongo_tutorial $ mongo --port 27000
Run the following command to check the status of the replica set
> rs.status()
{
"operationTime" : Timestamp(0, 0),
"ok" : 0,
"errmsg" : "no replset config has been received",
"code" : 94,
"codeName" : "NotYetInitialized",
"$clusterTime" : {
"clusterTime" : Timestamp(0, 0),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
What follows is a dialogue box that says “run rs.initiate() is not yet done for the set”. To initiate the replica set, run the following command:
> rs.initiate()
{
"info2" : "no configuration specified. Using a default configuration for the set",
"me" : "localhost:27000",
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1604637446, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1604637446, 1)
}
If you run rs.status() once again, you’ll see that a single member has been added. Now, let’s add others as well to this replica set.
a:PRIMARY> rs.add("127.0.0.1:27001")
{
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1604637486, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1604637486, 1)
}
a:PRIMARY> rs.add("127.0.0.1:27002")
{
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1604637491, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1604637491, 1)
}
Run rs.status() to check the status of the initiation. Once it has been initiated, repeat the same process for other shards and the config server.
Connect to the mongos process so as to add shards to the cluster.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongo --port 27017
Run the command below to add shards:
mongos> sh.addShard("a/127.0.0.1:27000")
{
"shardAdded" : "a",
"ok" : 1,
"operationTime" : Timestamp(1604637907, 8),
"$clusterTime" : {
"clusterTime" : Timestamp(1604637907, 8),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.addShard("b/127.0.0.1:27100")
{
"shardAdded" : "b",
"ok" : 1,
"operationTime" : Timestamp(1604638045, 6),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638045, 6),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.addShard("c/127.0.0.1:27200")
{
"shardAdded" : "c",
"ok" : 1,
"operationTime" : Timestamp(1604638065, 4),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638065, 4),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.addShard("d/127.0.0.1:27300")
{
"shardAdded" : "d",
"ok" : 1,
"operationTime" : Timestamp(1604638086, 6),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638086, 6),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
Note: When you run sh.addShard(‘a/127.0.0.1:27000’), if you get an output as shown below, try running sh.addShard(‘a/127.0.0.1:27001’) or sh.addShard(‘a/127.0.0.1:27002’).
{
"ok" : 0,
"errmsg" : "in seed list a/127.0.0.1:27000, host 127.0.0.1:27000 does not belong to replica set a; found { hosts: [ \"localhost:27000\", \"127.0.0.1:27001\", \"127.0.0.1:27002\" ], setName: \"a\", setVersion: 3, ismaster: true, secondary: false, primary: \"localhost:27000\", me: \"localhost:27000\", electionId: ObjectId('7fffffff0000000000000001'), lastWrite: { opTime: { ts: Timestamp(1604637886, 1), t: 1 }, lastWriteDate: new Date(1604637886000), majorityOpTime: { ts: Timestamp(1604637886, 1), t: 1 }, majorityWriteDate: new Date(1604637886000) }, maxBsonObjectSize: 16777216, maxMessageSizeBytes: 48000000, maxWriteBatchSize: 100000, localTime: new Date(1604637894239), logicalSessionTimeoutMinutes: 30, connectionId: 21, minWireVersion: 0, maxWireVersion: 8, readOnly: false, compression: [ \"snappy\", \"zstd\", \"zlib\" ], ok: 1.0, $clusterTime: { clusterTime: Timestamp(1604637886, 1), signature: { hash: BinData(0, 0000000000000000000000000000000000000000), keyId: 0 } }, operationTime: Timestamp(1604637886, 1) }",
"code" : 96,
"codeName" : "OperationFailed",
"operationTime" : Timestamp(1604637888, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1604637888, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
Sharding a collection
Run sh.status() to check if the shards have been added.
Now that we have set up the cluster, let’s test it.
When you shard a collection, you should choose a specific shard key. Without which sharding is impossible. It helps to distribute documents among members of the cluster.
The data stored by the key is called a chunk. The chunk size is only allowed to be between 1 and 1024 megabytes.
For example, let’s assume a collection of the following documents:
{
"_id": ObjectId("5f97d97eb7a0a940f157ebc8"),
"x": 1
}
And we use key x to shard the collection. The data is distributed as shown below:
x_low x_high shard
0 1000 S2 ----> chunk
1000 2000 S0 ----> chunk
2000 3000 S1 ----> chunk
The collection’s documents are distributed based on the shard key assigned. Documents of a particular shard key belong to the same shard.
The process of sharding can be divided into two:
To keep this tutorial simple, we prefer range-based sharding. Range-based sharding is helpful for queries involving a range. Against the backdrop of these processes, the system also carries out these two key operations:
For example, if a chunk size exceeds the prescribed size, the system, on its own, will attempt to find out the appropriate median key to divide the chunk into 2 or more parts. This is known as split, which is an inexpensive, metadata change. However, migration is responsible for maintaining the balance, for instance, moving a chunk from one shard to another. This is an expensive operation which usually involves transferring data worth ~1024MB.
You can still read and write to the database while the migration is in process. There’s a liveliness property to these migrations which keeps the database alive for operations, so a big lock doesn’t occur.
For our example, we’re creating a database named tutorial with collection foo and are going to insert some documents.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongo --port 27017
mongos> use tutorial
mongos> for(var i=0; i<999999; i++) { db.foo.insert({x:i}) }
To allow sharding for the database, we have to manually enable it. Connect to the mongo instance and run sh.enableSharding(“<dbname>”).
mongos> sh.enableSharding("tutorial")
{
"ok" : 1,
"operationTime" : Timestamp(1604638168, 21),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638168, 21),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
If you run sh.status() in the database, it returns “partitioned = True” in the tutorial database.
Now we shard our collection using key x. To shard a collection, we have to first create an index on the key and then run sh.shardCollection(“<db_name>.<collection_name>”, <key>).
mongos> db.foo.ensureIndex({x: 1})
{
"raw" : {
"b/127.0.0.1:27101,127.0.0.1:27102,localhost:27100" : {
"createdCollectionAutomatically" : false,
"numIndexesBefore" : 1,
"numIndexesAfter" : 2,
"ok" : 1
}
},
"ok" : 1,
"operationTime" : Timestamp(1604638185, 9),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638185, 9),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.shardCollection("tutorial.foo", {x: 1})
{
"collectionsharded" : "tutorial.foo",
"collectionUUID" : UUID("b6506a90-dc0f-48d2-ba22-c15bbc94c0d6"),
"ok" : 1,
"operationTime" : Timestamp(1604638203, 39),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638203, 39),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
The collection is now sharded. To check the shard distribution, number of chunks (nchunks), run the following command:
mongos> use tutorial
mongos> db.foo.stats()
To know the distribution of chunks across all databases, run the command below:
mongos> sh.status()
Let’s run some queries.
mongos> use tutorial
mongos> db.foo.find({x: 55})
To view the query plan or and understand how the cluster queries through different servers to get the document, append .explain() at the end of the query.
mongos> db.foo.find({x: 55}).explain()
{
"queryPlanner" : {
"mongosPlannerVersion" : 1,
"winningPlan" : {
"stage" : "SINGLE_SHARD",
"shards" : [
{
"shardName" : "b",
"connectionString" : "b/127.0.0.1:27101,127.0.0.1:27102,localhost:27100",
"serverInfo" : {
"host" : "Piyushs-MacBook-Pro.local",
"port" : 27100,
"version" : "4.2.8",
"gitVersion" : "43d25964249164d76d5e04dd6cf38f6111e21f5f"
},
"plannerVersion" : 1,
"namespace" : "tutorial.foo",
"indexFilterSet" : false,
"parsedQuery" : {
"x" : {
"$eq" : 55
}
},
"queryHash" : "716F281A",
"planCacheKey" : "0FA0E5FD",
"winningPlan" : {
"stage" : "FETCH",
"inputStage" : {
"stage" : "SHARDING_FILTER",
"inputStage" : {
"stage" : "IXSCAN",
"keyPattern" : {
"x" : 1
},
"indexName" : "x_1",
"isMultiKey" : false,
"multiKeyPaths" : {
"x" : [ ]
},
"isUnique" : false,
"isSparse" : false,
"isPartial" : false,
"indexVersion" : 2,
"direction" : "forward",
"indexBounds" : {
"x" : [
"[55.0, 55.0]"
]
}
}
}
},
"rejectedPlans" : [ ]
}
]
}
},
"serverInfo" : {
"host" : "Piyushs-MacBook-Pro.local",
"port" : 27017,
"version" : "4.2.8",
"gitVersion" : "43d25964249164d76d5e04dd6cf38f6111e21f5f"
},
"ok" : 1,
"operationTime" : Timestamp(1604638250, 30),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638255, 27),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
In the second part of this two-part series, we will explore various methods by which we can select a shard key.
Discover how CloudSEK's comprehensive takedown services protect your brand from online threats.
How to bypass CAPTCHAs easily using Python and other methods
What is shadow IT and how do you manage shadow IT risks associated with remote work?
Take action now
CloudSEK Platform is a no-code platform that powers our products with predictive threat analytic capabilities.
Digital Risk Protection platform which gives Initial Attack Vector Protection for employees and customers.
Software and Supply chain Monitoring providing Initial Attack Vector Protection for Software Supply Chain risks.
Creates a blueprint of an organization's external attack surface including the core infrastructure and the software components.
Instant Security Score for any Android Mobile App on your phone. Search for any app to get an instant risk score.
min read
MongoDB Sharding 101: Creating a Sharded Cluster
Sharding is a process in MongoDB, that divides large data sets into smaller volumes of data and distributes them across multiple MongoDB instances. When the data within MongoDB is huge, the queries run against such large data sets can lead to high CPU usage. MongoDB leverages the feature of Sharding to tackle this. During this process, the large database is split into shards (subsets of data) that logically function as a collection. Shards are implemented through clusters that consist of:
In a two-part series about sharding in MongoDB, we explain:
In the first part we provide a tutorial on how to create a sharded cluster, explaining all the basics of sharding.
To create a cluster, we need to be clear about:
mongod and mongos are two different processes in MongoDB. While mongod is the primary daemon that handles data requests, data access and performs background management operations, mongos is a shard utility that processes and routes user queries and determines the location of data in the sharded cluster. The shards and config servers run the mongod process, whereas the query router server runs the mongos process.
For our illustration, let’s take 4 shards (namely a, b, c, and d) and a replication factor of 3. So in total there will be 12 shard servers (mongod processes). We’ll also run 4 query routers (mongos processes) and 3 config servers (small mongod processes) as one would in a production environment; running all of this in one server simulating a cluster.
In actuality, these shards along with their replica sets are run on 12 different machines. However, config servers are lightweight processes and with only 4 shards. So, the load will be considerably low. This means you can run config servers on any one of the shard servers. The mongos processes can be run either on a shard server or directly on a client application machine.
The benefit of running mongos on a client application machine is that it runs on a local interface without having to go outside of the network. However, you should remember to enable the right security options. On the other hand, if you run it on the shard server or any other server, the client will not have to go through the process of setting the cluster up.
The first step in the process is to create data directories for our shard servers, config servers, and logs.
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/a0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/a1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/a2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/b0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/b1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/b2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/c0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/c1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/c2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/d0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/d1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p data/d2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p config/cfg0
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p config/cfg1
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir -p config/cfg2
Tue Oct 27 piyushgoyal mongo_tutorial $ mkdir logs
Use the mongod command, as shown below, to start the config server. We have, thus, passed the configsvr parameter that declares the config server for that cluster.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --configsvr --replSet cfg --dbpath config/cfg0 --port 26050 --fork --logpath logs/log.cfg0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --configsvr --replSet cfg --dbpath config/cfg1 --port 26051 --fork --logpath logs/log.cfg1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --configsvr --replSet cfg --dbpath config/cfg2 --port 26052 --fork --logpath logs/log.cfg2 --logappend
Now let’s get on with our shard servers. Here, we run the mongod command to pass the shardsvr parameter which in turn declares that it’s part of a sharded cluster. We then rename the replica set with the name of the shard.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet a --dbpath data/a0 --port 27000 --fork --logpath logs/log.a0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet a --dbpath data/a1 --port 27001 --fork --logpath logs/log.a1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet a --dbpath data/a2 --port 27002 --fork --logpath logs/log.a2 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet b --dbpath data/b0 --port 27100 --fork --logpath logs/log.b0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet b --dbpath data/b1 --port 27101 --fork --logpath logs/log.b1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet b --dbpath data/b2 --port 27102 --fork --logpath logs/log.b2 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet c --dbpath data/c0 --port 27200 --fork --logpath logs/log.b0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet c --dbpath data/c1 --port 27201 --fork --logpath logs/log.b1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet c --dbpath data/c2 --port 27202 --fork --logpath logs/log.b2 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet d --dbpath data/d0 --port 27300 --fork --logpath logs/log.c0 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet d --dbpath data/d1 --port 27301 --fork --logpath logs/log.c1 --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongod --shardsvr --replSet d --dbpath data/d2 --port 27302 --fork --logpath logs/log.c2 --logappend
In the next step, we run the mongos command to start mongos processes. In order to tell where the config servers are, we pass config servers IP using the –configdb parameter. We’ll run a single mongos process on a standard mongo TCP port to adhere to best practices.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos1 --port 26061 --bind_ip 0.0.0.0 --fork --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos2 --port 26062 --bind_ip 0.0.0.0 --fork --logappend
Tue Oct 27 piyushgoyal mongo_tutorial $ mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos3 --port 26063 --bind_ip 0.0.0.0 --fork --logappend
Note that we never run the config server and the sharded server on the same standard mongo TCP port.
Now, check if all the processes are running smoothly.
Tue Oct 27 piyushgoyal mongo_tutorial $ ps -A | grep mongo
26745 ?? 0:00.95 mongod --configsvr --replSet cfg --dbpath config/cfg0 --port 26050 --fork --logpath logs/log.cfg0 --logappend
26748 ?? 0:00.90 mongod --configsvr --replSet cfg --dbpath config/cfg1 --port 26051 --fork --logpath logs/log.cfg1 --logappend
26754 ?? 0:00.90 mongod --configsvr --replSet cfg --dbpath config/cfg2 --port 26052 --fork --logpath logs/log.cfg2 --logappend
26757 ?? 0:00.77 mongod --shardsvr --replSet a --dbpath data/a0 --port 27000 --fork --logpath logs/log.a0 --logappend
26760 ?? 0:00.77 mongod --shardsvr --replSet a --dbpath data/a1 --port 27001 --fork --logpath logs/log.a1 --logappend
26763 ?? 0:00.76 mongod --shardsvr --replSet a --dbpath data/a2 --port 27002 --fork --logpath logs/log.a2 --logappend
26766 ?? 0:00.76 mongod --shardsvr --replSet b --dbpath data/b0 --port 27100 --fork --logpath logs/log.b0 --logappend
26769 ?? 0:00.77 mongod --shardsvr --replSet b --dbpath data/b1 --port 27101 --fork --logpath logs/log.b1 --logappend
26772 ?? 0:00.75 mongod --shardsvr --replSet b --dbpath data/b2 --port 27102 --fork --logpath logs/log.b2 --logappend
26775 ?? 0:00.73 mongod --shardsvr --replSet c --dbpath data/c0 --port 27200 --fork --logpath logs/log.b0 --logappend
26784 ?? 0:00.75 mongod --shardsvr --replSet c --dbpath data/c1 --port 27201 --fork --logpath logs/log.b1 --logappend
26791 ?? 0:00.74 mongod --shardsvr --replSet c --dbpath data/c2 --port 27202 --fork --logpath logs/log.b2 --logappend
26794 ?? 0:00.77 mongod --shardsvr --replSet d --dbpath data/d0 --port 27300 --fork --logpath logs/log.c0 --logappend
26797 ?? 0:00.75 mongod --shardsvr --replSet d --dbpath data/d1 --port 27301 --fork --logpath logs/log.c1 --logappend
26800 ?? 0:00.71 mongod --shardsvr --replSet d --dbpath data/d2 --port 27302 --fork --logpath logs/log.c2 --logappend
26803 ?? 0:00.00 mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
26804 ?? 0:00.24 mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
76826 ?? 8:58.30 /usr/local/opt/mongodb-community/bin/mongod --config /usr/local/etc/mongod.conf
26817 ttys009 0:00.00 grep mongo
26801 ttys016 0:00.01 mongos --configdb cfg/127.0.0.1:26050,127.0.0.1:26051,127.0.0.1:26052 --logpath logs/log.mongos0 --port 27017 --bind_ip 0.0.0.0 --fork --logappend
Connect to one of the shard servers. For instance, let’s connect to 0 shard
Tue Oct 27 piyushgoyal mongo_tutorial $ mongo --port 27000
Run the following command to check the status of the replica set
> rs.status()
{
"operationTime" : Timestamp(0, 0),
"ok" : 0,
"errmsg" : "no replset config has been received",
"code" : 94,
"codeName" : "NotYetInitialized",
"$clusterTime" : {
"clusterTime" : Timestamp(0, 0),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
What follows is a dialogue box that says “run rs.initiate() is not yet done for the set”. To initiate the replica set, run the following command:
> rs.initiate()
{
"info2" : "no configuration specified. Using a default configuration for the set",
"me" : "localhost:27000",
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1604637446, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1604637446, 1)
}
If you run rs.status() once again, you’ll see that a single member has been added. Now, let’s add others as well to this replica set.
a:PRIMARY> rs.add("127.0.0.1:27001")
{
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1604637486, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1604637486, 1)
}
a:PRIMARY> rs.add("127.0.0.1:27002")
{
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1604637491, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1604637491, 1)
}
Run rs.status() to check the status of the initiation. Once it has been initiated, repeat the same process for other shards and the config server.
Connect to the mongos process so as to add shards to the cluster.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongo --port 27017
Run the command below to add shards:
mongos> sh.addShard("a/127.0.0.1:27000")
{
"shardAdded" : "a",
"ok" : 1,
"operationTime" : Timestamp(1604637907, 8),
"$clusterTime" : {
"clusterTime" : Timestamp(1604637907, 8),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.addShard("b/127.0.0.1:27100")
{
"shardAdded" : "b",
"ok" : 1,
"operationTime" : Timestamp(1604638045, 6),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638045, 6),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.addShard("c/127.0.0.1:27200")
{
"shardAdded" : "c",
"ok" : 1,
"operationTime" : Timestamp(1604638065, 4),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638065, 4),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.addShard("d/127.0.0.1:27300")
{
"shardAdded" : "d",
"ok" : 1,
"operationTime" : Timestamp(1604638086, 6),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638086, 6),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
Note: When you run sh.addShard(‘a/127.0.0.1:27000’), if you get an output as shown below, try running sh.addShard(‘a/127.0.0.1:27001’) or sh.addShard(‘a/127.0.0.1:27002’).
{
"ok" : 0,
"errmsg" : "in seed list a/127.0.0.1:27000, host 127.0.0.1:27000 does not belong to replica set a; found { hosts: [ \"localhost:27000\", \"127.0.0.1:27001\", \"127.0.0.1:27002\" ], setName: \"a\", setVersion: 3, ismaster: true, secondary: false, primary: \"localhost:27000\", me: \"localhost:27000\", electionId: ObjectId('7fffffff0000000000000001'), lastWrite: { opTime: { ts: Timestamp(1604637886, 1), t: 1 }, lastWriteDate: new Date(1604637886000), majorityOpTime: { ts: Timestamp(1604637886, 1), t: 1 }, majorityWriteDate: new Date(1604637886000) }, maxBsonObjectSize: 16777216, maxMessageSizeBytes: 48000000, maxWriteBatchSize: 100000, localTime: new Date(1604637894239), logicalSessionTimeoutMinutes: 30, connectionId: 21, minWireVersion: 0, maxWireVersion: 8, readOnly: false, compression: [ \"snappy\", \"zstd\", \"zlib\" ], ok: 1.0, $clusterTime: { clusterTime: Timestamp(1604637886, 1), signature: { hash: BinData(0, 0000000000000000000000000000000000000000), keyId: 0 } }, operationTime: Timestamp(1604637886, 1) }",
"code" : 96,
"codeName" : "OperationFailed",
"operationTime" : Timestamp(1604637888, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1604637888, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
Sharding a collection
Run sh.status() to check if the shards have been added.
Now that we have set up the cluster, let’s test it.
When you shard a collection, you should choose a specific shard key. Without which sharding is impossible. It helps to distribute documents among members of the cluster.
The data stored by the key is called a chunk. The chunk size is only allowed to be between 1 and 1024 megabytes.
For example, let’s assume a collection of the following documents:
{
"_id": ObjectId("5f97d97eb7a0a940f157ebc8"),
"x": 1
}
And we use key x to shard the collection. The data is distributed as shown below:
x_low x_high shard
0 1000 S2 ----> chunk
1000 2000 S0 ----> chunk
2000 3000 S1 ----> chunk
The collection’s documents are distributed based on the shard key assigned. Documents of a particular shard key belong to the same shard.
The process of sharding can be divided into two:
To keep this tutorial simple, we prefer range-based sharding. Range-based sharding is helpful for queries involving a range. Against the backdrop of these processes, the system also carries out these two key operations:
For example, if a chunk size exceeds the prescribed size, the system, on its own, will attempt to find out the appropriate median key to divide the chunk into 2 or more parts. This is known as split, which is an inexpensive, metadata change. However, migration is responsible for maintaining the balance, for instance, moving a chunk from one shard to another. This is an expensive operation which usually involves transferring data worth ~1024MB.
You can still read and write to the database while the migration is in process. There’s a liveliness property to these migrations which keeps the database alive for operations, so a big lock doesn’t occur.
For our example, we’re creating a database named tutorial with collection foo and are going to insert some documents.
Tue Oct 27 piyushgoyal mongo_tutorial $ mongo --port 27017
mongos> use tutorial
mongos> for(var i=0; i<999999; i++) { db.foo.insert({x:i}) }
To allow sharding for the database, we have to manually enable it. Connect to the mongo instance and run sh.enableSharding(“<dbname>”).
mongos> sh.enableSharding("tutorial")
{
"ok" : 1,
"operationTime" : Timestamp(1604638168, 21),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638168, 21),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
If you run sh.status() in the database, it returns “partitioned = True” in the tutorial database.
Now we shard our collection using key x. To shard a collection, we have to first create an index on the key and then run sh.shardCollection(“<db_name>.<collection_name>”, <key>).
mongos> db.foo.ensureIndex({x: 1})
{
"raw" : {
"b/127.0.0.1:27101,127.0.0.1:27102,localhost:27100" : {
"createdCollectionAutomatically" : false,
"numIndexesBefore" : 1,
"numIndexesAfter" : 2,
"ok" : 1
}
},
"ok" : 1,
"operationTime" : Timestamp(1604638185, 9),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638185, 9),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
mongos> sh.shardCollection("tutorial.foo", {x: 1})
{
"collectionsharded" : "tutorial.foo",
"collectionUUID" : UUID("b6506a90-dc0f-48d2-ba22-c15bbc94c0d6"),
"ok" : 1,
"operationTime" : Timestamp(1604638203, 39),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638203, 39),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
The collection is now sharded. To check the shard distribution, number of chunks (nchunks), run the following command:
mongos> use tutorial
mongos> db.foo.stats()
To know the distribution of chunks across all databases, run the command below:
mongos> sh.status()
Let’s run some queries.
mongos> use tutorial
mongos> db.foo.find({x: 55})
To view the query plan or and understand how the cluster queries through different servers to get the document, append .explain() at the end of the query.
mongos> db.foo.find({x: 55}).explain()
{
"queryPlanner" : {
"mongosPlannerVersion" : 1,
"winningPlan" : {
"stage" : "SINGLE_SHARD",
"shards" : [
{
"shardName" : "b",
"connectionString" : "b/127.0.0.1:27101,127.0.0.1:27102,localhost:27100",
"serverInfo" : {
"host" : "Piyushs-MacBook-Pro.local",
"port" : 27100,
"version" : "4.2.8",
"gitVersion" : "43d25964249164d76d5e04dd6cf38f6111e21f5f"
},
"plannerVersion" : 1,
"namespace" : "tutorial.foo",
"indexFilterSet" : false,
"parsedQuery" : {
"x" : {
"$eq" : 55
}
},
"queryHash" : "716F281A",
"planCacheKey" : "0FA0E5FD",
"winningPlan" : {
"stage" : "FETCH",
"inputStage" : {
"stage" : "SHARDING_FILTER",
"inputStage" : {
"stage" : "IXSCAN",
"keyPattern" : {
"x" : 1
},
"indexName" : "x_1",
"isMultiKey" : false,
"multiKeyPaths" : {
"x" : [ ]
},
"isUnique" : false,
"isSparse" : false,
"isPartial" : false,
"indexVersion" : 2,
"direction" : "forward",
"indexBounds" : {
"x" : [
"[55.0, 55.0]"
]
}
}
}
},
"rejectedPlans" : [ ]
}
]
}
},
"serverInfo" : {
"host" : "Piyushs-MacBook-Pro.local",
"port" : 27017,
"version" : "4.2.8",
"gitVersion" : "43d25964249164d76d5e04dd6cf38f6111e21f5f"
},
"ok" : 1,
"operationTime" : Timestamp(1604638250, 30),
"$clusterTime" : {
"clusterTime" : Timestamp(1604638255, 27),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
In the second part of this two-part series, we will explore various methods by which we can select a shard key.