Scala, MongoDB, and Cats-Effect
Mongo4cats: Combining Scala, MongoDB, and Cats-Effect to create non-blocking and asynchronous I/O operations executed in a purely functional way.
Join the DZone community and get the full member experience.
Join For FreeMongoDB is an open-source database that uses a document-oriented data model and a non-structured query language. It is one of the most powerful NoSQL databases around today. In comparison to traditional SQL databases, MongoDB does not use the usual rows and columns to model its data; instead, it uses a BSON (Binary JSON) format to save the data (documents) in collections, where the basic unit of data consists of a set of key-value pairs.
For Scala, there are several MongoDB clients available, the most popular of which are the official MongoDB Scala Driver and ReactiveMongo.
However, in this blog post, I am going to introduce a more recent MongoDB client — mongo4cats, which represents a wrapper around a native MongoDB Java client compatible with Cats-Effect (3.x) and FS2 (3.x) libraries. By providing integration with Cats-Effect, MongoDB can now support fully non-blocking and asynchronous I/O operations executed in a purely functional way.
About Cats-Effect
Cats-Effect is a high-performance, asynchronous, composable framework for building real-world applications in a purely functional style. As a library, Cats Effect provides an IO monad that can be used for capturing, controlling, and composing effects (such as making a connection to a database or executing a query), and allows performing them within a resource-safe, typed context. More on it here.
Dependencies
To start, we will need to add the required dependencies to our build.sbt
file:
libraryDependencies ++= Seq(
"io.github.kirill5k" %% "mongo4cats-core" % "0.3.0",
"io.github.kirill5k" %% "mongo4cats-circe" % "0.3.0"
)
mongo4cats-core brings the core functionality needed to make connections to the database and for executing queries, whereas mongo4cats-circe adds an additional syntax for decoding our entities with Circe codecs (more on it later).
Connecting to The Database
To connect to our database, we will need to create an instance of MongoClient[F]
first. The MongoClient[F]
represents a pool of connections for a given MongoDB server deployment and typically only one instance of this class is required per application (even with multiple operations executed concurrently). The easiest way of creating a client is by calling a fromConnectionString
method:
import mongo4cats.client._
val mongoClient: Resource[IO, MongoClient[IO]] =
MongoClientF.fromConnectionString[IO]("mongodb://localhost:27017")
When creating a client using any of MongoClient
constructor methods, we get a Resource[IO, MongoClient[IO]]
which ensures that the connection is closed after its use.
Once we have our client, we can then start using it for making connections to our database:
mongoClient.use { client =>
for {
db <- client.getDatabase("testdb")
} yield ()
}
Data Modeling
Internally, MongoDB stores all of its data in a BSON format, which is a close cousin of a traditional JSON that we all got used to. Similarities between these two formats allow us to derive MongoDB codecs with tools that are normally used for doing transformations of case classes into a plain JSON in Scala. One of such tools is Circe.
To begin with, let’s create a simple case class to model data in our collection:
import mongo4cats.bson.ObjectId
sealed trait PaymentMethod
final case class Paypal(email: String) extends PaymentMethod
final case class CreditCard(
name: String,
number: String,
expiry: String,
cvv: Int
) extends PaymentMethod
final case class Payment(
id: ObjectId,
amount: BigDecimal,
method: PaymentMethod,
date: Instant
)
Once our data model is defined, we can get our collection from the database:
import io.circe.generic.auto._
import mongo4cats.circe._
for {
...
db <- client.getDatabase("testdb")
coll <- db.getCollectionWithCodec[Payment]("payments")
} yield ()
Calling MongoDatabase[F]
methodgetCollectionWithCodec[T]
requires to have an instance of MongoCodecProvider[T]
available in the implicit scope, which will then be used by the collection internally for obtaining codecs for encoding and decoding our entities into BSON documents. Luckily, since we have this import included:
import mongo4cats.circe._
The codec provider will be derived automatically with the help of Encoder[T]
and Decoder[T]
instances brought in by including Circe’s automatic derivation import:
import io.circe.generic.auto._
Now we have everything ready to start working with the data!
Inserting Documents
To insert an object into the database, simply call insertOne
to insert a single document or insertMany
to insert a sequence of documents:
val creditCard = CreditCard("John Bloggs", "1111222233334444", "1021", 919)
val paypal = Paypal("john.bloggs@test.com")
val payment1 = Payment(ObjectId(), BigDecimal(2.5), paypal, Instant.parse("2021-04-05T12:00:00Z"))
val payment2 = Payment(ObjectId(), BigDecimal(9.99), creditCard, Instant.parse("2021-04-12T12:00:00Z"))
for {
...
_ <- coll.insertOne(payment1)
_ <- coll.insertMany(List(payment2))
} yield ()
Querying Documents
The most straightforward way to query a collection would be to use its find
method:
import mongo4cats.collection.operations.Filter
val afterDateFilter = Filter.gte("date", Instant.parse("2021-04-01T00:00:00Z"))
val beforeDateFilter = Filter.lt("date", Instant.parse("2021-05-01T00:00:00Z"))
for {
...
payments <- coll.find
.filter(afterDateFilter && beforeDateFilter)
.sortByDesc("amount")
.limit(5)
.all
} yield ()
As can be noted from the example, calling find
returns a query builder, using which you have an option of adding additional filters, sorts, projections and limits. The query can be executed by either calling:
first
— returns an option containing only first document that matches a provided queryall
— returns a sequence containing all found documentsstream
— returns a stream in which documents are emitted as soon as they are received from the database
When building a filter using the Filter
class from mongo4cats.collection.operations
package, there is a vast variety of additional query predicates and projections available for removing documents that you know are not relevant to incoming queries. Moreover, these filters can be chained together using a logical operator &&
(and) and ||
(or).
Updating Documents
Now, let’s see how we can find a document and update some of its fields.
import mongo4cats.collection.operations.Update
val payment1Filter = Filter.idEq(payment1.id)
val amountUpdate = Update
.set("amount", BigDecimal(5.0))
.currentTimestamp("updatedAt")
for {
...
found <- coll.findOneAndUpdate(payment1Filter, amountUpdate)
} yield ()
In this example, a new operation type was introduced — Update
, which allows us to build and sequence multiple update operations that then will be executed on documents that match with the provided filter. Here we find a document with the id matching of the id of payment payment1
, update its price, add a new currentTimestamp
field and return the original document. Alternatively, there are also methods for just updating one or multiple documents: updateOne
and updateMany
, respectively. Both of these methods have an API similar to findOneAndUpdate
.
Similarly, there are also methods of replacing or deleting documents: findOneAndReplace
and findOneAndDelete
, respectively.
Streaming
As was mentioned above, when building a find query, there are options for returning a result in a form of a single element, sequence of elements, or in the form of FS2 Stream. FS2 is a library for purely functional, effectful, and polymorphic stream processing. FS2 is built upon the Cats and Cats Effect, while its core types (streams and pulls) are polymorphic in the effect type (as long as it is compatible with cats-effect type classes), and thus can be used with other effect libraries.
for {
...
total <- coll.find
.stream
.map(_.amount)
.fold(BigDecimal(0))(_ + _)
.compile
.last
} yield ()
In the presented example we are streaming all documents from our collection and calculate the total amount paid.
It needs to be noted, that calling stream
creates an unbounded stream that will pull as many documents as there are until the collection gets empty or until the memory gets full. If there are a lot of documents in the collection, then creating a stream like this might lead to potential out-of-memory failures. To avoid this, there is an alternative method available — boundedStream(capacity)
, that will distribute always at max capacicy
elements.
Conclusion
In this blog post, I tried to highlight the most common cases when working with MongoDB and show how easy it is to use with the effect system like Cats Effect through mongo4cats. The examples presented here only cover the most basic and common parts, for more advanced usage patterns you can refer to the GitHub or official MongoDB documentation.
Published at DZone with permission of Kirill Bibik. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments