Publishing Domain Events with Spring Integration EventBus
How to use Spring Integration to keep domain and integration concerns separate and simple for many Spring-based applications.
Join the DZone community and get the full member experience.
Join For Freeintegrations can become messy if you do not take them seriously -- especially when you have both inbound and outbound integrations at the same point. integration concerns can unwillingly pollute your domain if you do not have proper foundations laid down in the first place. hexagonal architecture helps keep your domain clean from security, transactional and other non-functional aspects by handing them over to the application service layer. but still there are integration challenges that we have to address outside the application service layer -- specifically, those external integrations that originate from domain events published by application domain.
if you are already using spring framework in your project, then having spring integration layer greatly simplifies these challenges. although apache camel can also be a good candidate, i found spring integration more appealing because of its simplicity and compatibility with other spring projects.
in this article, i will demonstrate how i used spring integration in one of my recent project to keep domain oblivious to integration concerns and simplified at the same time. the main goal of the project was to expedite the process of emigrating passenger at a very busy airport. the project had lots of inbound and outbound integrations with external systems belonging to airline, airport, residency & foreign affairs sector, etc. below diagram depicts the architecture of software application that was the outcome of that project.
on the left side of the diagram, these small blocks in the rest api layer represents workflow activities which are responsible for manipulating the state of underlying domain objects. rest api layer communicates with application api via these workflow activites. this also involves necessary back and forth translation between resource and domain objects. different types of request which are shown on left side comes from front-end application. requests which does not require any external input goes directly to application service via workflow activities. whereas, some request that requires external system intervention, triggers integration flows to gather the necessary information from outside world and then send the enriched request to application service api layer. these external integrations can be greatly simplified by interacting with a java interface backed up by a messaging gateway provided by spring integration. the bottom left rest api requests terminate at a workflow activity which triggers an integration flow with an outside system before calling the application service.
whereas, domain events (shown with dotted arrows pointing outwards from the domain) are published by the domain in effect of certain business incidents. these events are published to outside world so they can also react onto them by subscribing to those events. you will be lucky enough, if all the other applications are in downstream and interpretation of event messages generated by your bounded context is their responsibility. hence, in most of the cases, it is your responsibility to construct the message into their respective formats and put them into some messaging system, so that they can pick from it. not only that, you may sometime require to inform multiple system for a single event. this requires transformation, service orchestration, routing and lots of other integration concerns to care about. also, not to forget you are also required to persist all domain events into some event store as a snapshot for future reference.
to address these concerns, we will implement a custom event bus which we will be using to publish events from application domain. we then be able to consume these events directly into spring integration flows, just like shown in above diagram, to notify external systems into their respective formats. there you will be having full liberty of addressing integration concerns using eip patterns in spring integration flows.
below is the basic interface of eventbus that will be used to publish and subscribe events. complete source code is available over github .
package org.springframework.integration.eventbus;
public interface eventbus{
public boolean publish(event event);
boolean subscribe(string topic, object subscriber);
boolean unsubscribe(string topic, object subscriber);
}
behind the scenes, this eventbus implementation uses publish-subscribe-channel to send the events to all subscribes. first argument in eventbus subscribe/unscribe methods should be the name of that publish-subscribe-channel. whereas for pulishing domain events, eventbus will be able to extract it from event type. below code snippet shows how a domain event has to be defined:
public interface event {
public string fortopic();
}
public abstract class domainevent implements event {
public int eventversion;
private date createdon;
private transient string topic;
protected domainevent(date createdon) {
this.setcreatedon(createdon);
this.settopic("domaineventschannel");
this.seteventversion(1);
}
private void settopic(string topic) {
this.topic = topic;
}
private void setcreatedon(date createdon) {
this.createdon = createdon;
}
public date occurredon() {
return createdon;
}
public string fortopic() {
return topic;
}
private void seteventversion(int eventversion) {
this.eventversion = eventversion;
}
public int eventversion() {
return eventversion;
}
}
public class foodomainevent extends domainevent {
public foodomainevent() {
super(new date());
}
}
here "domaineventschannel" is the name of the publish-subscribe-channel that has to be configured for eventbus as mentioned above. below is the example spring configuration that wires up all necessary components together.
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemalocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd">
<context:annotation-config />
<!-- helper utility used by domain to publish event using eventbus -->
<bean class="com.foobar.domain.model.domaineventpublisher"/>
<bean id="eventbus" class="org.springframework.integration.eventbus.springeventbus" />
<!-- publish subscriber channel used by eventbus -->
<int:publish-subscribe-channel id="domaineventschannel"/>
<!-- eventstore where all events has to be persisted -->
<bean id="eventstore" class="com.foobar.infrastructure.eventstore.inmemoryeventstore" />
<int:service-activator input-channel="domaineventschannel" ref="eventstore" />
<import resource="foo-bar-integration-flow.xml"/>
</beans>
domaineventpublisher is a convenient class that delegates call to eventbus for publishing the event. it is an implementation of ambient context pattern, that helps you in publishing events from anywhere deep down in hierarchy of domain objects. you can provide access to eventbus as per your own taste, if trade-offs of using that pattern are not acceptable to you. rest of the configuration is quite explanatory. hence, let us move to foo-bar-integration-flow.xml.
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemalocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
<bean id="foobarapplicationservice" class="com.foobar.application.foobarapplicationservice"/>
<int:payload-type-router input-channel="domaineventschannel" default-output-channel="nullchannel">
<int:mapping type="com.foobar.domain.model.bardomainevent" channel="barevents"/>
<int:mapping type="com.foobar.domain.model.foodomainevent" channel="fooevents"/>
</int:payload-type-router>
<int:publish-subscribe-channel id="fooevents"/>
<int:publish-subscribe-channel id="barevents"/>
<int:chain input-channel="barevents">
<int:transformer expression="payload.getclass().getname()"/>
<stream:stdout-channel-adapter append-newline="true"/>
</int:chain>
<int:service-activator input-channel="fooevents" ref="foobarapplicationservice"/>
</beans>
in above integration flow, we are routing domain events into two different channels one that prints out barevent at console and the other one calls application service method that is annotated with subsribeevent annotation, as shown in below code snippet.
import org.springframework.integration.eventbus.subscribeevent;
import org.springframework.stereotype.service;
import com.foobar.domain.model.bardomainevent;
import com.foobar.domain.model.domainevent;
@service
public class foobarapplicationservice extends basefoobarapplicationservice{
@subscribeevent
public void when(bardomainevent event){
system.out.println(this.getclass().getsimplename()+" received event "+event.getclass().getsimplename());
}
@subscribeevent
public void when(domainevent event) {
system.out.println(this.getclass().getsimplename()+" received event "+event.getclass().getsimplename());
}
}
although there is not much happening in above integration flow, but it is more than enough to illustrate how powerful that tool can be. you can now have the full liberty of routing, transforming, and publishing the event by any mean you want.
if you find above way of handing events bit verbose, in terms of notifying the subscriber beans, then you can use below configuration to make it more concise.
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemalocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
<context:annotation-config />
<context:component-scan base-package="com.foobar.application"/>
<bean class="com.foobar.domain.model.domaineventpublisher"/>
<!-- event subscriber bean processor -->
<bean id="eventsubscriberbeanprocessor" class="org.springframework.integration.eventbus.eventsubscriberbeanprocessor" />
<bean id="eventbus" class="org.springframework.integration.eventbus.springeventbus" />
<bean id="eventstore" class="com.foobar.infrastructure.eventstore.inmemoryeventstore" />
<int:publish-subscribe-channel id="domaineventschannel"/>
</beans>
in above configuration "eventsubscriberbeanprocessor" automatically registers all the beans that have methods annotated with subscribeevent annotation. therefore, you do not have to always explicitly register them like we had done for "eventstore" and "foobarapplicationservice". although this will give more advantage over previous configuration, make sure you should not violate the transaction boundary constraint of aggregates . which states, only one aggregate can be modified in one transaction. finally, below test case reveals how domain event can be published.
@runwith(junit4.class)
public class springeventbustestcase{
@test
public void springeventbusverbosesubscribetest() {
applicationcontext context = new classpathxmlapplicationcontext("/meta-inf/spring/integration/spring-eventbus-verbose-subscribe-ctxt.xml");
foobarapplicationservice applicationservice = context.getbean(foobarapplicationservice.class);
applicationservice.updatefoo();
applicationservice.updatebar();
// test sending arbitrary domain event to validate that it gets consumed by basefoobarapplicationservice
domaineventpublisher.publish(new domainevent(new date()){});
eventstore eventstore = context.getbean(eventstore.class);
//verify all above domain events are logged in event store
assert.asserttrue(eventstore.alldomaineventssince(-1l).size()==3);
}
}
notice how i have used domaineventpublisher to publish an event. whereas, rest of the events are being published as from inside domain when updatefoo and updatebar methods are called.
that is it for the day. grab the source code from github play with it and enlighten me with your feedback.
Opinions expressed by DZone contributors are their own.
Comments