Tracking Changes in MongoDB With Scala and Akka
Leverage change streams feature in MongoDB to get every change occurring in real-time.
Join the DZone community and get the full member experience.
Join For FreeNeed for Real-Time Consistent Data
Many different databases are used at Adform, each tailored for specific requirements, but what is common for these use cases is the necessity for a consistent interchange of data between these data stores. It’s a tedious task to keep the origin of that data and its copies consistent manually, not to mention that with a sufficiently large number of multiplications the origin may not be the source of truth anymore. The need for having its own copy of data is also dictated by the necessity of loose coupling and performance. It wouldn’t be practical to be constantly impacted by every change made in the source system. The answer here is an event-based architecture which allows to keep every change consistent and provides us with the possibility of restoring the sequence of changes related to particular entities. For those reasons, the decision was made to use the publisher/subscriber model. MongoDB’s change streams saved the day, finally letting us say farewell to much more complex oplog tailing.
Change Streams
As of version 3.6 MongoDB offers change data capture implementation named as change streams. It allows us to follow every modification made to an entire database or chosen set of collections. Previous versions already offered some solution to that problem by means of oplog (operation log) mechanism but tailing it directly had serious drawbacks, especially huge traffic caused by iteration overall changes to all collections and lack of reliable API allowing to resume tracking after any interruption. Change streams solve these issues by hiding oplog’s nook and crannies from us behind refined API interoperable with reactive streams implementations.
Simple Demo Application
I prepared a small application written in Scala and Akka presenting change streams in action.
Dockerfile
The dockerfile I prepared for this demo customizes MongoDB image with a single node replica set setup. For change streams to work, the replica set consisting of at least one node must be configured. Even though it’s not feasible for production, it works well for simplicity’s sake here.
FROM mongo:4.4.1-bionic
RUN echo "rs.initiate();" > /docker-entrypoint-initdb.d/replica-init.js
CMD [ "--replSet", "single_node_replica_set" ]
The replica set is not initiated by default — it has to be done explicitly.
Build Docker Image
Let’s build the image using some user friendly tag, e.g. single_rs.
xxxxxxxxxx
$ docker build . -t mongo:single_rs
Run MongoDB in Container
xxxxxxxxxx
$ docker run -p 27017:27017 -d --name mongo mongo:single_rs
The 27017 port is MongoDB’s default.
Replica set verification
We can easily look inside our container using docker exec
command:
xxxxxxxxxx
$ docker exec -it mongo bash
Then we can get into MongoDB shell by just typing:
# mongo
Check replica set’s status:
> rs.status()
You should see a similar output to the following:
xxxxxxxxxx
"members" : [
{
"_id" : 0,
"name" : "127.0.0.1:27017",
"health" : 1,
"state" : 1,
"stateStr" : "PRIMARY",
"uptime" : 118,
"optime" : {
"ts" : Timestamp(1601225360, 1),
"t" : NumberLong(2)
},
"optimeDate" : ISODate("2020-09-27T16:49:20Z"),
"syncSourceHost" : "",
"syncSourceId" : -1,
"infoMessage" : "Could not find member to sync from",
"electionTime" : Timestamp(1601225250, 1),
"electionDate" : ISODate("2020-09-27T16:47:30Z"),
"configVersion" : 1,
"configTerm" : 2,
"self" : true,
"lastHeartbeatMessage" : ""
}
],
The replica set consists of only one node, hence the members
array has only one object and the infoMessage
field informs us that there is no secondary node to sync from.
Test Database Setup
While having the access to our instance’s shell we can also create test database and test collection we will subscribe to.
xxxxxxxxxx
> use test_db
> db.createCollection("test_collection")
Application Code
It’s a very simple application that subscribes to the given MongoDB instance and prints incoming records. I leave you with the link to the source code at the bottom of this page.
Directory Structure
xxxxxxxxxx
.
├── build.sbt
├── Dockerfile
├── project
│ └── build.properties
└── src
└── main
└── scala
└── com
└── adform
└── demo
└── Main.scala
build.sbt
xxxxxxxxxx
lazy val root = (project in file("."))
.settings(
name := "MongoDB Change Tracking Demo",
scalaVersion := "2.12.8",
version := "1.0"
)
libraryDependencies ++= Seq(
"com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "2.0.2"
)
We need only one dependency — alpakka MongoDB connector which is reactive streams implementation.
build.properties
xxxxxxxxxx
sbt.version=1.3.4
Rather self-explanatory — it’s just required sbt version.
Main.scala
xxxxxxxxxx
package com.adform.demo
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.mongodb.scaladsl.MongoSource
import akka.stream.scaladsl.Sink
import com.mongodb.reactivestreams.client.MongoClients
import com.mongodb.{ConnectionString, MongoClientSettings}
object Main {
def main(args: Array[String]): Unit = {
val settings = MongoClientSettings
.builder()
.applyConnectionString(new ConnectionString("mongodb://localhost:27017"))
.build()
val db = MongoClients.create(settings).getDatabase("test_db")
val collection = db.getCollection("test_collection")
val publisher = collection.watch()
val source = MongoSource(publisher)
implicit val system: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
source.runWith(Sink.foreach(println))
}
}
Main class — it’s a simplistic chunk of code that points to our container with MongoDB and listens to incoming events, printing them on the standard output when they come.
Run the application
xxxxxxxxxx
$ sbt run
You should see initial logs like these:
xxxxxxxxxx
Sep 28, 2020 11:30:22 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
Sep 28, 2020 11:30:22 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:1, serverValue:8}] to localhost:27017
Sep 28, 2020 11:30:22 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 4, 1]}, minWireVersion=0, maxWireVersion=9, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=8637134, setName='single_node_replica_set', canonicalAddress=127.0.0.1:27017, hosts=[127.0.0.1:27017], passives=[], arbiters=[], primary='127.0.0.1:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000002, setVersion=1, lastWriteDate=Mon Sep 28 23:30:18 CEST 2020, lastUpdateTimeNanos=44003362611716}
Sep 28, 2020 11:30:24 PM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:2, serverValue:9}] to localhost:27017
Let’s add some records
xxxxxxxxxx
> db.test_collection.insertMany([{_id: 1, name: "test_record_1"}, {_id: 2, name: "test_record_2"}])
Application’s output
xxxxxxxxxx
ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825F7255FC000000012B022C0100296E5A1004B3CC867FB8934F57A5CD65B672BF625F461E5F6964002B020004", "_typeBits": {"$binary": "QA==", "$type": "00"}}, namespace=test_db.test_collection, destinationNamespace=null, fullDocument=Document{{_id=1.0, name=test_record_1}}, documentKey={"_id": 1.0}, clusterTime=Timestamp{value=6877654121768288257, seconds=1601328636, inc=1}, updateDescription=null, txnNumber=null, lsid=null}
ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825F7255FC000000022B022C0100296E5A1004B3CC867FB8934F57A5CD65B672BF625F461E5F6964002B040004", "_typeBits": {"$binary": "QA==", "$type": "00"}}, namespace=test_db.test_collection, destinationNamespace=null, fullDocument=Document{{_id=2.0, name=test_record_2}}, documentKey={"_id": 2.0}, clusterTime=Timestamp{value=6877654121768288258, seconds=1601328636, inc=2}, updateDescription=null, txnNumber=null, lsid=null}
We see that the two inserted (insert OperationType) records (placed in the fullDocument
field) were consumed. It’s worth to notice that with every record we get resume token as well — it can be persisted and used to restart the stream after any interruption (e.g. application’s crash).
Execute some more operations
xxxxxxxxxx
> db.test_collection.remove({_id: 1})
> db.test_collection.update({_id: 2}, {"name": "updated name"})
> db.test_collection.drop()
Let’s give a look to the produced output:
xxxxxxxxxx
ChangeStreamDocument{ operationType=OperationType{value='delete'}, resumeToken={"_data": "825F73A158000000022B022C0100296E5A1004B3CC867FB8934F57A5CD65B672BF625F461E5F6964002B020004", "_typeBits": {"$binary": "QA==", "$type": "00"}}, namespace=test_db.test_collection, destinationNamespace=null, fullDocument=null, documentKey={"_id": 1.0}, clusterTime=Timestamp{value=6878018455254073346, seconds=1601413464, inc=2}, updateDescription=null, txnNumber=null, lsid=null}
ChangeStreamDocument{ operationType=OperationType{value='replace'}, resumeToken={"_data": "825F73A199000000012B022C0100296E5A1004B3CC867FB8934F57A5CD65B672BF625F461E5F6964002B040004", "_typeBits": {"$binary": "QA==", "$type": "00"}}, namespace=test_db.test_collection, destinationNamespace=null, fullDocument=Document{{_id=2.0, name=updated name}}, documentKey={"_id": 2.0}, clusterTime=Timestamp{value=6878018734426947585, seconds=1601413529, inc=1}, updateDescription=null, txnNumber=null, lsid=null}
ChangeStreamDocument{ operationType=OperationType{value='drop'}, resumeToken={"_data": "825F73A1C3000000012B022C0100296E5A1004B3CC867FB8934F57A5CD65B672BF625F04"}, namespace=test_db.test_collection, destinationNamespace=null, fullDocument=null, documentKey=null, clusterTime=Timestamp{value=6878018914815574017, seconds=1601413571, inc=1}, updateDescription=null, txnNumber=null, lsid=null}
ChangeStreamDocument{ operationType=OperationType{value='invalidate'}, resumeToken={"_data": "825F73A1C3000000012B022C0100296F5A1004B3CC867FB8934F57A5CD65B672BF625F04"}, namespace=null, destinationNamespace=null, fullDocument=null, documentKey=null, clusterTime=Timestamp{value=6878018914815574017, seconds=1601413571, inc=1}, updateDescription=null, txnNumber=null, lsid=null}
We got four different change events. The three of them correspond in the same order to operations executed in MongoDB in the previous step. The last is a special one, implicating the stream’s closure. It’s important to notice that delete events come without full document, but we are still able to identify what was removed by using documentKey
property. After the occurrence of event indicating drop of database/collection or changing its name, we get invalidate event, which ends the stream and we need to manually recover from it.
Takeaways
- Always prefer change streams to oplog for its friendly API and features like resuming the stream, pushing filtering predicates directly to MongoDB (aggregation pipeline) or total ordering of incoming records
- Supported reactive streams implementations give you features like back pressure out of the box
- Change streams work only with replica sets enabled
- The alpakka library offers more sinks than the one used for demo purposes
Sink.foreach
— you can easily improve that sample application to sink e.g. to Kafka by passing such sink provided by the library - Change streams don’t stream the initial data existing in the collection before creating subscriber’s session — it means that it has to be implemented separately
Source Code
https://github.com/hubert-skowronek/mongodb-change-streams-demo
Opinions expressed by DZone contributors are their own.
Comments