Exploring Apache Ignite With Spring Boot
Discuss distributed caching with Apache Ignite and Spring Boot, Ignite’s distributed locks, Spring Data support, and code-deployment to invoke remote code execution.
Join the DZone community and get the full member experience.
Join For FreeFor the use cases that I am going to describe here, I have 2 services:
- courses-service basically provides CRUD operations for dealing with courses and instructors.
- reviews-service is another CRUD operations provider for dealing with reviews for courses that are totally agnostic of courses from courses-service.
Both apps are written in Kotlin using Spring Boot and other libraries. Having these 2 services, we are going to discuss distributed caching with Apache Ignite and Spring Boot, and we’ll see how we can use code-deployment to invoke remote code execution via Apache Ignite on a service.
Spoiler alert: The examples/usecases presented here are designed purely for the sake of demonstrating integration with some of Apache Ignite’s capabilities; the discussed problems here can be solved in various ways and maybe even in better ways, so don’t spend too much on thinking “why." So, without further ado, let’s dive into the code.
Note: here is the source code in case you want to follow along.
Simple Distributed Caching
We’ll focus on the courses-service for now, having this entity:
@Entity
@Table(name = "courses")
class Course(
var name: String,
@Column(name = "programming_language")
var programmingLanguage: String,
@Column(name = "programming_language_description", length = 3000, nullable = true)
var programmingLanguageDescription: String? = null,
@Enumerated(EnumType.STRING)
var category: Category,
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "instructor_id")
var instructor: Instructor? = null
) : AbstractEntity() {
override fun toString(): String {
return "Course(id=$id, name='$name', category=$category)"
}
}
And this method in CourseServiceImpl
:
@Transactional
override fun save(course: Course): Course {
return courseRepository.save(course)
}
I want to enhance every course that is saved with a programming language description for the programming language that has been sent by the user. For this, I created a Wikipedia API client that will make the following request every time a new course is added.
GET https://en.wikipedia.org/api/rest_v1/page/summary/java_(programming_language)
So, my method looks like this now:
@Transactional
override fun save(course: Course): Course {
enhanceWithProgrammingLanguageDescription(course)
return courseRepository.save(course)
}
private fun enhanceWithProgrammingLanguageDescription(course: Course) {
wikipediaApiClient.fetchSummaryFor("${course.programmingLanguage}_(programming_language)")?.let { course.programmingLanguageDescription = it.summary }
}
That’s great. Now here comes our use case: we want to cache the Wikipedia response so we don’t call the Wikipedia API every time. Our courses will be mostly oriented to a set of popular programming languages like Java, Kotlin, C#, and other popular programming languages. We don’t want to decrease our save’s performance querying every time for mostly the same language. Also, this can act as a guard in case the API server is down.
Time to introduce Apache Ignite!
Apache Ignite is a distributed database for high-performance computing with in-memory speed. Data in Ignite is stored in-memory and/or on-disk, and is either partitioned or replicated across a cluster of multiple nodes. This provides scalability, performance, and resiliency.
You can read about lots of places where Apache Ignite is the appropriate solution and about all the advantages on their FAQ page.
When it comes to integrating a Spring Boot app with Apache Ignite (embedded), it is quite straightforward and simple, but – there is a but – it has its corner cases that we are going to discuss, especially when you want, let’s say, Java 17 code deployment or Spring Data. There are a few ways of configuring Apache Ignite, via XML or the programmatic way. I picked the programmatic way of configuring Apache Ignite.
Here are the dependencies:
implementation("org.apache.ignite:ignite-core:2.15.0")
implementation("org.apache.ignite:ignite-kubernetes:2.15.0")
implementation("org.apache.ignite:ignite-indexing:2.15.0")
implementation("org.apache.ignite:ignite-spring-boot-autoconfigure-ext:1.0.0")
Here is the configuration that we are going to add to courses-service:
@Configuration
@Profile("!test")
@EnableConfigurationProperties(value = [IgniteProperties::class])
class IgniteConfig(val igniteProperties: IgniteProperties) {
@Bean(name = ["igniteInstance"])
fun igniteInstance(ignite: Ignite): Ignite {
return ignite
}
@Bean
fun configurer(): IgniteConfigurer {
return IgniteConfigurer { igniteConfiguration: IgniteConfiguration ->
igniteConfiguration.setIgniteInstanceName(igniteProperties.instanceName)
igniteConfiguration.setDiscoverySpi(configureDiscovery()) // allow possibility to switch to Kubernetes
}
}
private fun configureDiscovery(): TcpDiscoverySpi {
val spi = TcpDiscoverySpi()
var ipFinder: TcpDiscoveryIpFinder? = null;
if (igniteProperties.discovery.tcp.enabled) {
ipFinder = TcpDiscoveryMulticastIpFinder()
ipFinder.setMulticastGroup(DFLT_MCAST_GROUP)
} else if (igniteProperties.discovery.kubernetes.enabled) {
ipFinder = TcpDiscoveryKubernetesIpFinder()
ipFinder.setNamespace(igniteProperties.discovery.kubernetes.namespace)
ipFinder.setServiceName(igniteProperties.discovery.kubernetes.serviceName)
}
spi.setIpFinder(ipFinder)
return spi
}
}
First, as you might have noticed, there is the IgniteProperties
class that I created in order to allow flexible configuration based on the profile. In my case, local is going to be multicast discovery, and on prod, it will be Kubernetes discovery, but this class is not mandatory.
@ConstructorBinding
@ConfigurationProperties(prefix = "ignite")
data class IgniteProperties(
val instanceName: String,
val discovery: DiscoveryProperties = DiscoveryProperties()
)
@ConstructorBinding
data class DiscoveryProperties(
val tcp: TcpProperties = TcpProperties(),
val kubernetes: KubernetesProperties = KubernetesProperties()
)
@ConstructorBinding
data class TcpProperties(
val enabled: Boolean = false,
val host: String = "localhost"
)
data class KubernetesProperties(
val enabled: Boolean = false,
val namespace: String = "evil-inc",
val serviceName: String = "course-service"
)
And here are its corresponding values from application.yaml
:
ignite:
instanceName: ${spring.application.name}-server-${random.uuid}
discovery:
tcp:
enabled: true
host: localhost
kubernetes:
enabled: false
namespace: evil-inc
service-name: course-service
Then we define a bean name igniteInstance
, which is going to be our main entry point for all Ignite APIs. Via the provided IgniteConfigurer
from ignite-spring-boot-autoconfigure-ext:1.0.0
, we start the configuration of our igniteInstance
, and provide a name that is picked up from the properties. Then we configure the discovery service provider interface via TcpDiscoverySpi
. As I mentioned earlier, based on the properties provided I will either use the TcpDiscoveryMulticastIpFinder
or the TcpDiscoveryKubernetesIpFinder
. With this, our basic configuration is done, and we can start it!
Not so fast!
Apache Ignite is backed by an H2 in-memory database, and being in the Spring Boot realm, you’ll get it automatically. This is as much of a blessing as it is a curse because Ignite supports only a specific H2 version and we need to declare it explicitly in our build.gradle
like this:
ext["h2.version"] = "1.4.197"
Also, if you’re like me running on Java 17, you might’ve gotten this exception:
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.ignite.IgniteJdbcThinDriver
To address this exception, we have to add the following VM arguments to our run configuration:
--add-opens=jdk.management/com.sun.management.internal=ALL-UNNAMED
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.management/com.sun.jmx.mbeanserver=ALL-UNNAMED
--add-opens=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED
--add-opens=java.base/sun.reflect.generics.reflectiveObjects=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
Now we can start it!
INFO 11116 --- [4-6ceb9d7d547b%] o.a.i.i.m.d.GridDiscoveryManager : Topology snapshot [ver=2, locNode=9087c6ef, servers=1, clients=0, state=ACTIVE, CPUs=16, offheap=6.3GB, heap=4.0GB …
INFO 11116 --- [4-6ceb9d7d547b%] o.a.i.i.m.d.GridDiscoveryManager : ^-- Baseline [id=0, size=1, online=1, offline=0]
INFO 32076 --- [ main] o.a.i.s.c.tcp.TcpCommunicationSpi : Successfully bound communication NIO server to TCP port [port=47100, locHost=0.0.0.0/0.0.0.0, selectorsCnt=8, selectorSpins=0, pairedConn=false]
INFO 32076 --- [ main] o.a.i.spi.discovery.tcp.TcpDiscoverySpi : Successfully bound to TCP port [port=47500, localHost=0.0.0.0/0.0.0.0, locNodeId=84e5553d-a7a9-46d9-a98c-81f34bf84673]
Once you see this log, Ignite is up and running, The topology snapshot states that there is one server running, and no clients, and we can see that the discovery/communication took place by binding to ports 47100/47500
.
Also, in the logs, you might’ve observed some warnings like these. Let’s see how we can get rid of them:
1.
^-- Set max direct memory size if getting 'OOME: Direct buffer memory' (add '-XX:MaxDirectMemorySize=<size>[g|G|m|M|k|K]' to JVM options)
Add the following VM argument: -XX:MaxDirectMemorySize=256m
2.
^-- Specify JVM heap max size (add '-Xmx<size>[g|G|m|M|k|K]' to JVM options)
Add the following VM arguments:
-Xms512m
-Xmx2g
3.
Metrics for local node (to disable set 'metricsLogFrequency' to 0)
This one is not really an issue and it might be very convenient during development, but at the moment it just spams the logs which I don’t like, so we’re going to disable it by adding this line in our configure:
igniteConfiguration.setMetricsLogFrequency(0)
4.
Message queue limit is set to 0 which may lead to potential OOMEs
This one is complaining about the parameter that is responsible for the limit of incoming and outgoing messages which has the default value to 0 which in other words is limitless. So we are going to set a limit by configuring the TcpCommunicationSpi
like this:
igniteConfiguration.setCommunicationSpi(configureTcpCommunicationSpi())
private fun configureTcpCommunicationSpi(): TcpCommunicationSpi {
val tcpCommunicationSpi = TcpCommunicationSpi()
tcpCommunicationSpi.setMessageQueueLimit(1024)
return tcpCommunicationSpi
}
Okay, now that everything is set up we can move on. Let’s configure a cache in IgniteConfig
class and see how we can fix our Wikipedia responses caching problem. In Apache Ignite we can configure a cache at the configuration level, or in runtime (in runtime, you can use a template for that, too). For this demo, I’ll show you how we can configure it in the configuration.
@Bean
fun configurer(): IgniteConfigurer {
return IgniteConfigurer { igniteConfiguration: IgniteConfiguration ->
igniteConfiguration.setIgniteInstanceName(igniteProperties.instanceName)
igniteConfiguration.setDiscoverySpi(configureDiscovery())
igniteConfiguration.setMetricsLogFrequency(0)
igniteConfiguration.setCommunicationSpi(configureTcpCommunicationSpi())
igniteConfiguration.setCacheConfiguration(wikipediaSummaryCacheConfiguration()) //vararg
}
}
Again our entry point for configuring Ignite is IgniteConfiguration
- igniteConfiguration.setCacheConfiguration
. This line accepts a variety of CacheConfiguration
(s).
private fun wikipediaSummaryCacheConfiguration(): CacheConfiguration<String, WikipediaApiClientImpl.WikipediaSummary> {
val wikipediaCache = CacheConfiguration<String, WikipediaApiClientImpl.WikipediaSummary>(WIKIPEDIA_SUMMARIES)
wikipediaCache.setIndexedTypes(String::class.java, WikipediaApiClientImpl.WikipediaSummary::class.java)
wikipediaCache.setEagerTtl(true)
wikipediaCache.setCacheMode(CacheMode.REPLICATED)
wikipediaCache.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
wikipediaCache.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
wikipediaCache.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(Duration(TimeUnit.MINUTES, 60)))
return wikipediaCache
}
wikipediaSummaryCacheConfiguration
returns a CacheConfiguration<String, WikipediaApiClientImpl.WikipediaSummary>
: as per our requirement, one Wikipedia summary per programming language. This class defines grid cache configuration. It defines all configuration parameters required to start a cache within a grid instance. Now let’s see how we configured it:
setIndexedTypes()
: This function is used to specify an array of key and value types that will be indexed.setEagerTtl()
: By setting this to true, Ignite will proactively remove cache entries that have expired.setExpiryPolicyFactory()
: This configuration sets the cache to expire entries after 60 minutes.setCacheMode()
: When you choose theREPLICATED
mode, all keys are distributed to every participating node. The default mode isPARTITIONED
, where keys are divided into partitions and distributed among nodes. You can also control the number of backup copies usingsetBackups()
, and specify the partition loss policy.setWriteSynchronizationMode()
: This flag determines whether Ignite will wait for write or commit responses from other nodes. The default isPRIMARY_SYNC
, where Ignite waits for the primary node to complete the write or commit but not for backups to update.setAtomicityMode()
: Setting this toTRANSACTIONAL
enables fully ACID-compliant transactions for key-value operations. In contrast,ATOMIC
mode disables distributed transactions and locking, providing higher performance but sacrificing transactional features.
Having this configuration, all that’s left is to adjust our enhanceWithProgrammingLanguageDescription
method to cache fetched Wikipedia summaries:
private fun enhanceWithProgrammingLanguageDescription(course: Course) {
val summaries = igniteInstance.cache<String, WikipediaApiClientImpl.WikipediaSummary>(WIKIPEDIA_SUMMARIES)
log.debug("Fetched ignite cache [$WIKIPEDIA_SUMMARIES] = size(${summaries.size()})]")
summaries[course.programmingLanguage]?.let {
log.debug("Cache value found, using cache's response $it to update $course programming language description")
course.programmingLanguageDescription = it.summary
} ?: wikipediaApiClient.fetchSummaryFor("${course.programmingLanguage}_(programming_language)")?.let {
log.debug("No cache value found, using wikipedia's response $it to update $course programming language description")
summaries.putIfAbsent(course.programmingLanguage, it)
it
}?.let { course.programmingLanguageDescription = it.summary }
}
Basically, we are using the bean of the Ignite instance to retrieve our configured cache. Each instance is a member and/or client in an Apache Ignite cluster. After getting a hold of the replicated cache, it is a matter of some simple checks: if we have a summary for the programming language key in our map, then we use that one. If not, we fetch it from the Wikipedia API, add it to the map, and use it.
Now let’s see it in action. If we execute the following HTTP request:
###
POST http://localhost:8080/api/v1/courses
Content-Type: application/json
{
"name": "C++ Development",
"category": "TUTORIAL",
"programmingLanguage" : "C++",
"instructor": {
"name": "Bjarne Stroustrup"
}
}
We’ll see in the logs:
DEBUG 32076 --- [nio-8080-exec-1] i.e.c.s.i.CourseServiceImpl$Companion : Fetched ignite cache [WIKIPEDIA_SUMMARIES] = size(0)]
DEBUG 32076 --- [nio-8080-exec-1] i.e.c.s.i.CourseServiceImpl$Companion : No cache value found, using wikipedia's response
We retrieved the previously configured cache for Wikipedia summaries, but its size is 0
. Therefore, the update took place using Wikipedia’s API. Now if we are to execute the same request again, we’ll notice a different behavior:
DEBUG 32076 --- [nio-8080-exec-2] i.e.c.s.i.CourseServiceImpl$Companion : Fetched ignite cache [WIKIPEDIA_SUMMARIES] = size(1)]
DEBUG 32076 --- [nio-8080-exec-2] i.e.c.s.i.CourseServiceImpl$Companion : Cache value found, using cache's response…
Now the cache has size 1
, and since it was populated by our previous request, no request to Wikipedia’s API can be observed. However, what truly highlights the elegance and ease of Apache Ignite's integration is when we launch another instance of our application on a different port using the -Dserver.port=8060
option. This is when we can see the replicated cache mechanism in action.
INFO 37600 --- [ main] o.a.i.s.c.tcp.TcpCommunicationSpi : Successfully bound communication NIO server to TCP port [port=47101, locHost=0.0.0.0/0.0.0.0, selectorsCnt=8, selectorSpins=0, pairedConn=false]
INFO 37600 --- [ main] o.a.i.spi.discovery.tcp.TcpDiscoverySpi : Successfully bound to TCP port [port=47501, localHost=0.0.0.0/0.0.0.0, locNodeId=4770d2ff-2979-4b4b-8d0e-30565aeff75e]
INFO 37600 --- [1-d0db3c4f0d78%] a.i.i.p.c.d.d.p.GridDhtPartitionDemander : Starting rebalance routine [WIKIPEDIA_SUMMARIES]
INFO 37600 --- [ main] o.a.i.i.m.d.GridDiscoveryManager : Topology snapshot [ver=6, locNode=4770d2ff, servers=2, clients=0, state=ACTIVE, CPUs=16, offheap=13.0GB, heap=4.0GB...
INFO 37600 --- [ main] o.a.i.i.m.d.GridDiscoveryManager : ^-- Baseline [id=0, size=2, online=2, offline=0]
We see that our TcpDiscoveryMulticastIpFinder
discovered an already running Apache Ignite node on ports 47100/47500
running together with our first courses-service instance on port 8080
. Therefore, additionally, a new cluster connection is established on ports 47101/47501
. This triggers the rebalancing routine for our cache. In the end, we observe in the topology log line that the number of servers now is 2
. Now if we are to make a new HTTP request to create the same course on 8060
instance, we’ll see the following:
DEBUG 37600 --- [nio-8060-exec-2] i.e.c.s.i.CourseServiceImpl$Companion : Fetched ignite cache [WIKIPEDIA_SUMMARIES] = size(1)]
DEBUG 37600 --- [nio-8060-exec-2] i.e.c.s.i.CourseServiceImpl$Companion : Cache value found, using cache's response
So, we used the same cache which has the size 1
, and no requests to Wikipedia’s API were made. As you might think, the same goes if we are to make some requests on 8060
for another language: the cache being populated will be seen on 8080
on request for that language, too.
Spring Data Support
A quite surprising feature that comes with Apache Ignite is the Spring Data support, which allows us to interact with our cache in a more elegant/familiar way. The Spring Data framework offers a widely adopted API that abstracts the underlying data storage from the application layer. Apache Ignite seamlessly integrates with Spring Data by implementing the Spring Data CrudRepository
interface. This integration further enhances the flexibility and adaptability of our application's data layer.
Let’s configure it by adding the following dependency:
implementation("org.apache.ignite:ignite-spring-data-ext:2.0.0")
Let’s declare our repository, by extending the IgniteRepository
.
@Repository
@RepositoryConfig(cacheName = WIKIPEDIA_SUMMARIES)
interface WikipediaSummaryRepository : IgniteRepository<WikipediaApiClientImpl.WikipediaSummary, String>
Having both Ignite’s Spring Data support and Spring Data JPA on the classpath might generate some bean scanning issues, which we can address by specifically instructing both the JPA and Ignite where to look for their beans like this:
@EnableIgniteRepositories(basePackages = ["inc.evil.coursecatalog.ignite"])
@EnableJpaRepositories(
basePackages = ["inc.evil.coursecatalog.repo"],
excludeFilters = [ComponentScan.Filter(type = FilterType.ANNOTATION, value = [RepositoryConfig::class])]
)
Having such a configuration, we ensure that Ignite will scan for its repositories only in the Ignite package, JPA will scan for its repositories only in the repo package, and will exclude any classes that have the @RepositoryConfig
on them.
Now let’s refactor our CourseServiceImpl
so it will use the newly created WikipediaSummaryRepository
:
private fun enhanceWithProgrammingLanguageDescription(course: Course) {
val summaries = wikipediaSummaryRepository.cache()
log.debug("Fetched ignite cache [$WIKIPEDIA_SUMMARIES] = size(${summaries.size()})]")
wikipediaSummaryRepository.findById(course.programmingLanguage).orElseGet {
wikipediaApiClient.fetchSummaryFor("${course.programmingLanguage}_(programming_language)")?.let {
log.debug("No cache value found, using wikipedia's response $it to update $course programming language description")
wikipediaSummaryRepository.save(course.programmingLanguage, it)
it
}
}?.let { course.programmingLanguageDescription = it.summary }
}
Instead of interacting directly with the low-level cache/map, we've transitioned to directing our requests to a new high-level class called WikipediaSummaryRepository
. This approach is not only more elegant in the implementation/usage, but also resonates much better with Spring fans, doesn't it? Also, you might’ve noticed that we no longer need the igniteInstance
to access the cache. The repository can give it to us via .cache()
method, so even if we use the IgniteRepository
we don’t lose access to our cache and its low-level operations. If we are to play with it in the same manner as we did with the cache, we’ll notice that the behavior didn’t change.
But wait, there’s more! Integration with Spring Data brings an abundance of advantages: query abstraction/query generation, manual queries, pagination/sorting, projections, query with Cache.Entry
return type or entity-like type – you name it – and IgniteRepository
will have it. For this purpose, I will experiment with the CommandLineRunner
since I don’t expose any API to integrate directly with the WikipediaSummaryRepository
.
First, let’s write some queries:
@Repository
@RepositoryConfig(cacheName = WIKIPEDIA_SUMMARIES)
interface WikipediaSummaryRepository : IgniteRepository<WikipediaSummary, String> {
fun findByTitle(title: String): List<WikipediaSummary>
fun findByDescriptionContains(keyword: String): List<Cache.Entry<String, WikipediaSummary>>
@Query(value = "select description, count(description) as \"count\" from WIKIPEDIA_SUMMARIES.WIKIPEDIASUMMARY group by description")
fun countPerDescription(): List<CountPerProgrammingLanguageType>
interface CountPerProgrammingLanguageType {
fun getDescription(): String
fun getCount(): Int
}
}
And here is the CommandLineRunner
:
@Bean
fun init(client: WikipediaApiClient, repo: WikipediaSummaryRepository): CommandLineRunner = CommandLineRunner {
run {
client.fetchSummaryFor("Java programming language")?.let { repo.save("Java", it) }
client.fetchSummaryFor("Kotlin programming language")?.let { repo.save("Kotlin", it) }
client.fetchSummaryFor("C++")?.let { repo.save("C++", it) }
client.fetchSummaryFor("Python programming language")?.let { repo.save("C#", it) }
client.fetchSummaryFor("Javascript")?.let { repo.save("Javascript", it) }
repo.findAll().forEach { log.info("Fetched {}", it) }
repo.findByTitle("Kotlin").forEach { log.info("Fetched by title {}", it) }
repo.findByDescriptionContains("programming language").forEach { log.info(" Fetched by description {}", it) }
repo.countPerDescription().forEach { log.info("Count per description {}", it) }
}
}
Before we can run it we’ll have to adjust a bit our cached entity like this:
@JsonIgnoreProperties(ignoreUnknown = true)
data class WikipediaSummary(
@JsonProperty("title")
@QuerySqlField(name = "title", index = true)
val title: String,
@JsonProperty("description")
@QuerySqlField(name = "description", index = false)
val description: String,
@JsonProperty("extract")
@QuerySqlField(name = "summary", index = false)
val summary: String
)
You might notice the @QuerySqlField
on each of the fields, all fields that will be involved in SQL clauses must have this annotation. This annotation is needed in order to instruct Ignite to create a column for each of our fields; otherwise, it will create a single huge column containing our payload. This is a bit intrusive, but that is a small price to pay for the plethora of possibilities we gain. Now once we run it, we have the following log line:
INFO 3252 --- [ main] i.e.c.CourseCatalogApplication$Companion : Fetched WikipediaSummary(title=Python (programming language)…
…
INFO 3252 --- [ main] i.e.c.CourseCatalogApplication$Companion : Fetched by description Entry [key=C#, val=WikipediaSummary(title=Python (programming language)…
…
INFO 3252 --- [ main] i.e.c.CourseCatalogApplication$Companion : Count per description {count=1, description=General-purpose programming language derived from Java}
…
This proves that our implementation works as expected.
Note: If you want to connect to connect to the Ignite’s in-memory database during your research, you might stumble on this VM argument: -DIGNITE_H2_DEBUG_CONSOLE=true
. I wanted to mention that the Ignite team deprecated IGNITE_H2_DEBUG_CONSOLE
in 2.8 version in favor of their thin JDBC driver. So if you want to connect to the DB, please refer to the updated documentation, but long story short: the JDBC URL is jdbc:ignite:thin://127.0.0.1/
with the default port 10800
, and IntelliJ provides first-class support in their database datasources.
Distributed Locks
Another useful feature that comes with Apache Ignite is the API for distributed locks. Suppose our enhanceWithProgrammingLanguageDescription
method is a slow intensive operation dealing with cache and other resources, and we wouldn’t want other threads on the same instance or even other requests from a different instance to interfere or alter something until the operation is complete. Here comes IgniteLock
into play: this interface offers a comprehensive API for managing distributed reentrant locks, similar to java.util.concurrent.ReentrantLock
. You can create instances of these locks using Ignite's reentrantLock()
method. IgniteLock
provides protection from node failures via the failoverSafe
flag when set to true: the lock will automatically recover. If the owning node fails, ensure uninterrupted lock management across the cluster. On the other hand, if failoverSafe
is set to false, a node failure will result in an IgniteException
, rendering the lock unusable. So with this in mind let’s try and guard our so-called “critical section."
private fun enhanceWithProgrammingLanguageDescription(course: Course) {
val lock = igniteInstance.reentrantLock(SUMMARIES_LOCK, true, true, true)
if (!lock.tryLock()) throw LockAcquisitionException(SUMMARIES_LOCK, "enhanceWithProgrammingLanguageDescription")
log.debug("Acquired lock {}", lock)
Thread.sleep(2000)
val summaries = wikipediaSummaryRepository.cache()
log.debug("Fetched ignite cache [$WIKIPEDIA_SUMMARIES] = size(${summaries.size()})]")
wikipediaSummaryRepository.findById(course.programmingLanguage).orElseGet {
wikipediaApiClient.fetchSummaryFor("${course.programmingLanguage}_(programming_language)")?.let {
log.debug("No cache value found, using wikipedia's response $it to update $course programming language description")
wikipediaSummaryRepository.save(course.programmingLanguage, it)
it
}
}?.let { course.programmingLanguageDescription = it.summary }
lock.unlock()
}
As you can see, the implementation is quite simple: we obtain the lock via the igniteInstance
’s reentrantLock
method and then we try locking it with tryLock()
. The locking will succeed if the acquired lock is available or already held by the current thread, and it will immediately return true. Otherwise, it will return false and a LockAcquisitionException
will be thrown. Then we simulate some intensive work by sleeping for 2 seconds with Thread.sleep(2000)
, and in the end, we release the acquired lock with unlock()
.
Now if we run a single instance of our app on port 8080
and try 2 subsequent requests, one will pass and the other one will fail:
ERROR 36580 --- [nio-8080-exec-2] e.c.w.r.e.RESTExceptionHandler$Companion : Exception while handling request [summaries-lock] could not be acquired for [enhanceWithProgrammingLanguageDescription] operation. Please try again.
inc.evil.coursecatalog.common.exceptions.LockAcquisitionException: [summaries-lock] could not be acquired for [enhanceWithProgrammingLanguageDescription] operation. Please try again.
The same goes if we are to make 1 request to an 8080
instance of our app and the next one in the 2-second timeframe to the 8060
instance - the first request will succeed while the second one will fail.
Code Deployment
Now let’s switch our attention to reviews-service, and remember – this service is totally unaware of courses: it is just a way to add reviews for some course_id
. With this in mind, we have this entity:
@Table("reviews")
data class Review(
@Id
var id: Int? = null,
var text: String,
var author: String,
@Column("created_at")
@CreatedDate
var createdAt: LocalDateTime? = null,
@LastModifiedDate
@Column("last_modified_at")
var lastModifiedAt: LocalDateTime? = null,
@Column("course_id")
var courseId: Int? = null
)
And we have this method in ReviewServiceImpl
.
So, our new silly feature request would be to somehow check for the existence of the course that the review has been written for. How can we do that? The most obvious choice would be to invoke a REST endpoint on courses-service to check if we have a course for the review’s course_id
, but that is not what this article is about. We have Apache Ignite, right? We are going to invoke code from course-service
from reviews-service
via Ignite’s cluster.
To do that, we need to create some kind of API or Gateway module that we are going to publish as an artifact so courses-service can implement it and reviews-service can depend on and use it to invoke the code.
Okay - first things first: let’s design the new module as a courses-api module:
plugins {
id("org.springframework.boot") version "2.7.3"
id("io.spring.dependency-management") version "1.0.13.RELEASE"
kotlin("jvm") version "1.6.21"
kotlin("plugin.spring") version "1.6.21"
kotlin("plugin.jpa") version "1.3.72"
`maven-publish`
}
group = "inc.evil"
version = "0.0.1-SNAPSHOT"
repositories {
mavenCentral()
}
publishing {
publications {
create<MavenPublication>("maven") {
groupId = "inc.evil"
artifactId = "courses-api"
version = "1.1"
from(components["java"])
}
}
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.6.4")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")
implementation("org.apache.commons:commons-lang3:3.12.0")
implementation("org.apache.ignite:ignite-core:2.15.0")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.1")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.8.1")
}
tasks.getByName<Test>("test") {
useJUnitPlatform()
}
Nothing fancy here, except the maven-publish
plugin that we’ll use to publish the artifact to the local Maven repository.
Here is the interface that courses-service
will implement, and reviews-service will use:
interface CourseApiFacade: Service {
companion object {
const val COURSE_API_FACADE_SERVICE_NAME = "CourseApiFacade"
}
fun findById(id: Int): CourseApiResponse
}
data class InstructorApiResponse(
val id: Int?,
val name: String?,
val summary: String?,
val description: String?
)
data class CourseApiResponse(
val id: Int?,
val name: String,
val category: String,
val programmingLanguage: String,
val programmingLanguageDescription: String?,
val createdAt: String,
val updatedAt: String,
val instructor: InstructorApiResponse
)
You might’ve noticed that CourseApiFacade
extends org.apache.ignite.services.Service
interface – an instance of grid-managed service, our entry point in the services that may be deployed.
Having this module properly configured, we can add it as a dependency in courses-service:
implementation(project(":courses-api"))
And implement the exposed interface like this:
@Component
class CourseApiFacadeImpl : CourseApiFacade {
@Transient
@SpringResource(resourceName = "courseService")
lateinit var courseService: CourseServiceImpl
@Transient
@IgniteInstanceResource //spring constructor injection won't work since ignite is not ready
lateinit var igniteInstance: Ignite
companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}
override fun findById(id: Int): CourseApiResponse = courseService.findById(id).let {
CourseApiResponse(
id = it.id,
name = it.name,
category = it.category.toString(),
programmingLanguage = it.programmingLanguage,
programmingLanguageDescription = it.programmingLanguageDescription,
createdAt = it.createdAt.toString(),
updatedAt = it.updatedAt.toString(),
instructor = InstructorApiResponse(it.instructor?.id, it.instructor?.name, it.instructor?.summary, it.instructor?.description)
)
}
override fun cancel() {
log.info("Canceling service")
}
override fun init() {
log.info("Before deployment :: Pre-initializing service before execution on node {}", igniteInstance.cluster().forLocal().node())
}
override fun execute() {
log.info("Deployment :: The service is deployed on grid node {}", igniteInstance.cluster().forLocal().node())
}
}
As you can see, CourseFacadeImpl
implements CourseFacade
method findById
and overrides some methods from the Service
interface for debugging purposes. When a service is deployed on a cluster node, Ignite will invoke the execute()
method of that service. Likewise, when a deployed service is canceled, Ignite will automatically invoke the cancel()
method of that service. init()
is guaranteed to be called before execute()
. Also, there are some new annotations:
@SpringResource(resourceName = "courseService")
- Annotates a field or a setter method for injection of resources fromSpring ApplicationContext
. Since this isIgniteService
now, we need to let Ignite take care of the bean injections.resourceName
is a mandatory field that is equal to the bean name in theSpring applicationContext
.@IgniteInstanceResource
– Again, since this is going to be deployed, we can’t rely on Spring anymore for the auto-wiring, so Ignite offers this annotation that offers the possibility to inject anigniteInstance
into grid tasks and grid jobs.@Transient/transient in java
– This annotation/keyword makes sure that we don’t serialize unnecessary hierarchies of objects in the cluster.
For everything mentioned above to work, we have to slightly modify our build.gradle
dependencies for Ignite.
implementation("org.apache.ignite:ignite-kubernetes:2.15.0")
implementation("org.apache.ignite:ignite-indexing:2.15.0")
implementation("org.apache.ignite:ignite-core:2.15.0")
implementation("org.apache.ignite:ignite-spring:2.15.0")
implementation("org.apache.ignite:ignite-spring-data-ext:2.0.0")
We got rid of ignite-spring-boot-autoconfigure
in favor of ignite-spring
, since I couldn’t make Ignite aware of the Spring’s application context with the autoconfiguration. As you might’ve guessed, since we don’t have IgniteAutoConfiguration
anymore, we have to write the Igniteconfiguration
manually, but don’t you worry: they are quite similar. Here’s the updated IgniteConfig
in courses-service:
@Configuration
@Profile("!test")
@EnableConfigurationProperties(value = [IgniteProperties::class])
@EnableIgniteRepositories(basePackages = ["inc.evil.coursecatalog.ignite"])
class IgniteConfig(
val igniteProperties: IgniteProperties,
val applicationContext: ApplicationContext
) {
companion object {
const val WIKIPEDIA_SUMMARIES = "WIKIPEDIA_SUMMARIES"
}
@Bean(name = ["igniteInstance"])
fun igniteInstance(igniteConfiguration: IgniteConfiguration): Ignite {
return IgniteSpring.start(igniteConfiguration, applicationContext)
}
@Bean
fun igniteConfiguration(): IgniteConfiguration {
val igniteConfiguration = IgniteConfiguration()
igniteConfiguration.setIgniteInstanceName(igniteProperties.instanceName)
igniteConfiguration.setMetricsLogFrequency(0) // no spam
igniteConfiguration.setCommunicationSpi(configureTcpCommunicationSpi()) // avoid OOM due to message limit
igniteConfiguration.setDiscoverySpi(configureDiscovery()) // allow possibility to switch to Kubernetes
igniteConfiguration.setCacheConfiguration(wikipediaSummaryCacheConfiguration()) //vararg
return igniteConfiguration
}
}
Not that much of a change, right? Instead of IgniteConfigurer
we declared a bean named IgniteConfiguration
that takes care of our configuration. We injected the applicationContext
in our config so we can pass it in the rewritten igniteInstance
bean that now is a Spring-aware IgniteSpring
.
Now that we’ve updated our configuration, we’ll have to tell Ignite about our new IgniteService – CourseApiFacade
.
@Bean
fun igniteConfiguration(): IgniteConfiguration {
val igniteConfiguration = IgniteConfiguration()
igniteConfiguration.setIgniteInstanceName(igniteProperties.instanceName)
igniteConfiguration.setPeerClassLoadingEnabled(true)
igniteConfiguration.setMetricsLogFrequency(0) // no spam
igniteConfiguration.setCommunicationSpi(configureTcpCommunicationSpi()) // avoid OOM due to message limit
igniteConfiguration.setDiscoverySpi(configureDiscovery()) // allow possibility to switch to Kubernetes
igniteConfiguration.setCacheConfiguration(wikipediaSummaryCacheConfiguration()) //vararg
igniteConfiguration.setServiceConfiguration(courseApiFacadeConfiguration()) //vararg
return igniteConfiguration
}
private fun courseApiFacadeConfiguration(): ServiceConfiguration {
val serviceConfiguration = ServiceConfiguration()
serviceConfiguration.service = courseApiFacade
serviceConfiguration.name = CourseApiFacade.COURSE_API_FACADE_SERVICE_NAME
serviceConfiguration.maxPerNodeCount = 1
return serviceConfiguration
}
We create a ServiceConfiguration
which is bound to courseApiFacade
with the name from the exposed interface in courses-api, and with a setting stating one service per node, lastly we set courseApiFacadeConfiguration
in the IgniteConfiguration
.
Now back to reviews-service. First of all, we want to add the required dependencies for Apache Ignite, since reviews-service is much simpler and doesn’t need the Spring-aware Ignite. We’ll go with ignite-spring-boot-autoconfigure
here:
implementation("org.apache.ignite:ignite-core:2.15.0")
implementation("org.apache.ignite:ignite-kubernetes:2.15.0")
implementation("org.apache.ignite:ignite-indexing:2.15.0")
implementation("org.apache.ignite:ignite-spring-boot-autoconfigure-ext:1.0.0")
implementation("org.apache.ignite:ignite-spring-data-ext:2.0.0")
Also, previously I mentioned that we are going to use that interface from courses-api. We can run the publishMavenPublicationToMavenLocal
gradle task on courses-api to get our artifact published and then we can add the following dependency to reviews-service.
implementation("inc.evil:courses-api:1.1")
Now we need to configure Ignite here as well as we did previously in courses-service:
@Configuration
@EnableConfigurationProperties(value = [IgniteProperties::class])
@EnableIgniteRepositories(basePackages = ["inc.evil.reviews.ignite"])
class IgniteConfig(val igniteProperties: IgniteProperties) {
@Bean(name = ["igniteInstance"])
fun igniteInstance(ignite: Ignite): Ignite {
return ignite
}
@Bean
fun configurer(): IgniteConfigurer {
return IgniteConfigurer { igniteConfiguration: IgniteConfiguration ->
igniteConfiguration.setIgniteInstanceName(igniteProperties.instanceName)
igniteConfiguration.setClientMode(true)
igniteConfiguration.setMetricsLogFrequency(0) // no spam
igniteConfiguration.setCommunicationSpi(configureTcpCommunicationSpi()) // avoid OOM due to message limit
igniteConfiguration.setDiscoverySpi(configureDiscovery()) // allow possibility to switch to Kubernetes
}
}
private fun configureTcpCommunicationSpi(): TcpCommunicationSpi {
val tcpCommunicationSpi = TcpCommunicationSpi()
tcpCommunicationSpi.setMessageQueueLimit(1024)
return tcpCommunicationSpi
}
private fun configureDiscovery(): TcpDiscoverySpi {
val spi = TcpDiscoverySpi()
var ipFinder: TcpDiscoveryIpFinder? = null;
if (igniteProperties.discovery.tcp.enabled) {
ipFinder = TcpDiscoveryMulticastIpFinder()
ipFinder.setMulticastGroup(DFLT_MCAST_GROUP)
} else if (igniteProperties.discovery.kubernetes.enabled) {
ipFinder = TcpDiscoveryKubernetesIpFinder()
ipFinder.setNamespace(igniteProperties.discovery.kubernetes.namespace)
ipFinder.setServiceName(igniteProperties.discovery.kubernetes.serviceName)
}
spi.setIpFinder(ipFinder)
return spi
}
}
The only difference from courses-service is that reviews-service will run in client mode. Other than that, everything is the same. Okay, with Ignite properly configured, it is time to make use of our IgniteService from courses-service in reviews-service. For this purpose, I created this class:
@Component
class IgniteCoursesGateway(private val igniteInstance: Ignite) {
fun findCourseById(id: Int) = courseApiFacade().findById(id)
private fun courseApiFacade(): CourseApiFacade {
return igniteInstance.services(igniteInstance.cluster().forServers())
.serviceProxy(CourseApiFacade.COURSE_API_FACADE_SERVICE_NAME, CourseApiFacade::class.java, false)
}
}
IgniteCoursesGateway
is an entry point in the courses domain world via the Ignite cluster. Via the autowired igniteInstance
, we retrieve a serviceProxy
of type CourseApiFacade
for the name COURSE_API_FACADE_SERVICE_NAME
. We also tell Ignite to always try to load-balance between services by setting the sticky flag to false
. Then in the findCourseById()
, we simply use the obtained serviceProxy
to query by id for the desired course.
All that’s left is to use IgniteCoursesGateway
in ReviewServiceImpl
to fulfill the feature’s requirements.
override suspend fun save(review: Review): Review {
runCatching {
igniteCoursesGateway.findCourseById(review.courseId!!).also { log.info("Call to ignite ended with $it") }
}.onFailure { log.error("Oops, ignite remote execution failed due to ${it.message}", it) }
.getOrNull() ?: throw NotFoundException(CourseApiResponse::class, "course_id", review.courseId.toString())
return reviewRepository.save(review).awaitFirst()
}
The logic is as follows: before saving, we try to find the course by course_id
from review by invoking the findCourseById
in our Ignite cluster. If we have an exception (CourseApiFacadeImpl
will throw a NotFoundException
if the requested course was not found), we swallow it and throw a reviews-service NotFoundException
stating that the course could’ve not been retrieved. If a course was returned by our method we proceed to save it – that’s it.
Now let’s restart course-service and observe the logs:
INFO 23372 --- [a-67c579c6ea47%] i.e.c.f.i.CourseApiFacadeImpl$Companion : Before deployment :: Pre-initializing service before execution on node TcpDiscoveryNode …
INFO 23372 --- [a-67c579c6ea47%] o.a.i.i.p.s.IgniteServiceProcessor : Starting service instance [name=CourseApiFacade, execId=52de6edc-ac6f-49d4-8c9e-17d6a6ebc8d5]
INFO 23372 --- [a-67c579c6ea47%] i.e.c.f.i.CourseApiFacadeImpl$Companion : Deployment :: The service is deployed on grid node TcpDiscoveryNode …
We see that according to our overridden methods of the Service
interface, CourseApiFacade
was successfully deployed. Now we have courses-service running, and if we are to start reviews-service, we’ll see the following log:
INFO 13708 --- [ main] o.a.i.i.m.d.GridDiscoveryManager : Topology snapshot [ver=2, locNode=cb90109d, servers=1, clients=1, state=ACTIVE, CPUs=16, offheap=6.3GB, heap=4.0GB...
INFO 13708 --- [ main] o.a.i.i.m.d.GridDiscoveryManager : ^-- Baseline [id=0, size=1, online=1, offline=0]
You may notice that we have 1 server running and 1 client. Now let’s try a request to add a review for an existing course (reviews-service is using GraphQL).
GRAPHQL http://localhost:8070/graphql
Content-Type: application/graphql
mutation { createReview(request: {text: "Amazing, loved it!" courseId: 39 author: "Mike Scott"}) {
id
text
author
courseId
createdAt
lastModifiedAt
}
}
In the logs, we’ll notice:
INFO 13708 --- [actor-tcp-nio-1] i.e.r.s.i.ReviewServiceImpl$Companion : Call to ignite ended with CourseApiResponse(id=39, name=C++ Development, category=TUTORIAL …
And in the courses-service logs, we’ll notice the code execution:
DEBUG 29316 --- [2-64cc57b09c89%] i.e.c.c.aop.LoggingAspect$Companion : before :: execution(public inc.evil.coursecatalog.model.Course inc.evil.coursecatalog.service.impl.CourseServiceImpl.findById(int))
This means that the request was executed successfully. If we try the same request for a non-existent course - let’s say, for ID 999
, we’ll observe the NotFoundException
in reviews-service.
WARN 33188 --- [actor-tcp-nio-1] .w.g.e.GraphQLExceptionHandler$Companion : Exception while handling request: CourseApiResponse with course_id equal to [999] could not be found!
Conclusion
Alright, everyone, that's a wrap! I trust you now have a good grasp of what Apache Ignite is all about. We delved into designing a simple distributed cache using Ignite and Spring Boot, explored Ignite's Spring Data Support, distributed locks for guarding critical sections of code, and, finally, witnessed how Apache Ignite's code deployment can execute code within the cluster.
Once again, if you missed it, you can access all the code we discussed in the link at the beginning of this article.
Happy coding!
Opinions expressed by DZone contributors are their own.
Comments