Running MongoDB Aggregations on Secondaries
Shifting aggregation to a secondary, such as with batch operations, can save your primaries for more pressing, time-sensitive matters.
Join the DZone community and get the full member experience.
Join For FreeAggregation operations in MongoDB allow you to process data records, group them and return computed results on them. MongoDB supports three kinds of aggregation operations: single purpose aggregation commands, Map-Reduce, and the aggregation pipeline. You can use this MongoDB comparison document to see which fits your needs.
The aggregation pipeline is a MongoDB framework that provides for data aggregation via a data processing pipeline. That means documents are sent through a multi-step pipeline, filtering, grouping, and otherwise transforming the documents at each step. It provides SQL “GROUP BY ….” types of constructs for MongoDB that run on the database itself. The aggregation documentation provides useful examples of creating such pipelines.
Why Run Aggregations on the Secondary?
Aggregation pipelines are resource-intensive operations — it makes sense to offload aggregations jobs to secondaries of a MongoDB replica set when it is OK to operate on slightly stale data. This is typically true for ‘batch’ operations since they don’t expect to run on the latest data. If the output needs to be written to a collection, then the aggregation jobs only run on the primary, since only the primary is writable in MongoDB.
In this post, we will show you how to ensure that aggregation pipelines are executed on the secondary both from the mongo shell and Java.
Note: We use the sample data set provided by MongoDB in their zip codes aggregation example to showcase our examples. You can download it as instructed in the example.
Aggregation Pipeline on Replica Sets
MongoDB Shell
Setting the read preference to secondary does the trick when running an aggregation job from the mongo shell. Let’s try to fetch all states with populations greater than 10 million (the first aggregation in the zip codes example). Both the shell and server are running MongoDB version 3.2.10.
mongo -u admin -p <pwd> --authenticationDatabase admin --host RS-repl0-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017
RS-repl0-0:PRIMARY> use test
switched to db test
RS-repl0-0:PRIMARY> db.setSlaveOk() // Ok to run commands on a slave
RS-repl0-0:PRIMARY> db.getMongo().setReadPref('secondary') // Set read pref
RS-repl0-0:PRIMARY> db.getMongo().getReadPrefMode()
secondary
RS-repl0-0:PRIMARY> db.zips.aggregate( [
... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
... { $match: { totalPop: { $gte: 10*1000*1000 } } }
... ] )
{ "_id" : "CA", "totalPop" : 29754890 }
{ "_id" : "FL", "totalPop" : 12686644 }
{ "_id" : "PA", "totalPop" : 11881643 }
{ "_id" : "NY", "totalPop" : 17990402 }
{ "_id" : "OH", "totalPop" : 10846517 }
{ "_id" : "IL", "totalPop" : 11427576 }
{ "_id" : "TX", "totalPop" : 16984601 }
A look into the MongoDB logs (with logging enabled for commands) on the secondary shows that aggregation indeed ran on the secondary:
...
2016-12-05T06:20:14.783+0000 I COMMAND [conn200] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, {
$match: { totalPop: { $gte: 10000000.0 } } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:229 reslen:338 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquire
Count: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_command 49ms
...
Java
From the MongoDB Java driver, again setting the read preference does the trick. Here’s an example using driver version 3.2.2:
public class AggregationChecker {
/*
* Data and code inspired from:
* https://docs.mongodb.com/v3.2/tutorial/aggregation-zip-code-data-set/#return-states-with-populations-above-10-million
*/
private static final String MONGO_END_POINT = "mongodb://admin:pwd@server-1.servers.example.com:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-repl0-0";
private static final String COL_NAME = "zips";
private static final String DEF_DB = "test";
public AggregationChecker() {
}
public static void main(String[] args) {
AggregationChecker writer = new AggregationChecker();
writer.aggregationJob();
}
private void aggregationJob() {
printer("Initializing...");
Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary());
MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options);
MongoClient client = new MongoClient(uri);
try {
final DB db = client.getDB(DEF_DB);
final DBCollection coll = db.getCollection(COL_NAME);
// Avg city pop by state: https://docs.mongodb.com/manual/tutorial/aggregation-zip-code-data-set/#return-average-city-population-by-state
Iterable iterable = coll.aggregate(
Arrays.asList(
new BasicDBObject("$group", new BasicDBObject("_id", new BasicDBObject("state", "$state").append("city", "$city")).append("pop",
new BasicDBObject("$sum", "$pop"))),
new BasicDBObject("$group", new BasicDBObject("_id", "$_id.state").append("avgCityPop", new BasicDBObject("$avg", "$pop"))))).results();
for (DBObject entry : iterable) {
printer(entry.toString());
}
} finally {
client.close();
}
printer("Done...");
}
...
}
Logs on the secondary:
...
2016-12-01T10:54:18.667+0000 I COMMAND [conn4113] command test.zips command: aggregate { aggregate: "zipcodes", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] } keyUpdates:0 writeConflicts:0 numYields:229 reslen:2149 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquireCount: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_query 103ms
...
No operation was recorded on the primary.
Aggregation pipelines are supported on sharded clusters. Detailed behavior is explained in the documentation. Implementation-wise, there is little difference between a replica set and sharded cluster when using an aggregation pipeline.
MongoDB Shell
Before importing data into the sharded cluster, enable sharding on the collection.
mongos> sh.enableSharding("test")
mongos> sh.shardCollection("test.zips", { "_id" : "hashed" } )
After that, operations are same as the replica set:
mongos> db.setSlaveOk()
mongos> db.getMongo().setReadPref('secondary')
mongos> db.getMongo().getReadPrefMode()
secondary
mongos> db.zips.aggregate( [
... { $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
... { $match: { totalPop: { $gte: 10*1000*1000 } } }
... ] )
{ "_id" : "TX", "totalPop" : 16984601 }
{ "_id" : "PA", "totalPop" : 11881643 }
{ "_id" : "CA", "totalPop" : 29754890 }
{ "_id" : "FL", "totalPop" : 12686644 }
{ "_id" : "NY", "totalPop" : 17990402 }
{ "_id" : "OH", "totalPop" : 10846517 }
{ "_id" : "IL", "totalPop" : 11427576 }
Logs from one of the secondaries:
...
2016-12-02T05:46:24.627+0000 I COMMAND [conn242] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44258973083 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms
2016-12-02T05:46:24.641+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44258973083 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:51 reslen:1601 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 13ms
...
Java
The same code in the replica set works fine with a sharded cluster. Just replace the replica set connection string with that of the sharded cluster. Logs from a secondary indicate that the job was indeed run on the secondaries:
...
2016-12-02T05:39:12.339+0000 I COMMAND [conn130] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44228970872 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms
2016-12-02T05:39:12.371+0000 I COMMAND [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44228970872 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:12902 reslen:741403 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 30ms
...
Was this content helpful? Let us know by tweeting at us @scaledgridio and, as always, if you have any questions let us know in the comments below.
Published at DZone with permission of Vaibhaw Pandey, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments