What Are Microservices and the Event Aggregator Pattern?
Learn about the Event Aggregator pattern for organizing event communication in a microservices architecture with an example.
Join the DZone community and get the full member experience.
Join For FreeOne common challenge arises in modern enterprise solutions built on the microservices architecture: effectively handling the flow of events from various sources. Many solutions employ diverse event providers such as Azure Service Bus, Apache Kafka, or RabbitMQ. To tackle this issue, a crucial component is needed to bring together event publishers and subscribers seamlessly.
Solution
Before we go deep into the implementation, we can look at Martin Fowler's Event Aggregator.
"An Event Aggregator is a simple element of indirection. In its simplest form, you have it register with all the source objects you are interested in and register all target objects with the Event Aggregator. The Event Aggregator responds to any event from a source object by propagating that event to the target objects."
Other patterns follow the principle of Event-Aggregation:
- Event Bus Pattern: The event bus pattern is similar to the pub-sub pattern but focuses on an event bus as a centralized communication channel. Publishers publish events on the event bus, and subscribers listen to events on the bus. The event bus handles the routing and distribution of events to the appropriate subscribers. This pattern is often used in microservices architectures for inter-service communication.
- Event-driven Microservices Pattern: In this pattern, event aggregators facilitate communication and coordination between microservices. Each microservice publishes events related to its activities, and other microservices can subscribe to those events. This enables loose coupling between microservices, as they can react to events without direct dependencies on each other. Event-driven microservices patterns help in building scalable and decoupled systems.
- Command Query Responsibility Segregation (CQRS) Pattern: CQRS is a pattern that separates the read and write operations of an application. An event aggregator is often used on the right side of the CQRS pattern to handle domain events generated by write operations. These events are then published to the event aggregator, which can update the read models or trigger further actions. The event aggregator plays a crucial role in ensuring that domain events are processed consistently and used to update the query side of the application.
Another example that follows this principle is Azure Event-Grid. Azure Event Grid is another example that adheres to this principle. Using Event Grid, you can seamlessly connect cloud resources that generate events (publishers) with resources that process those events (subscribers). The illustration below provides a visual representation of this concept.
Azure Event Grid empowers you to create data pipelines utilizing device data, integrate applications, and construct serverless architectures driven by events. It facilitates the publication and subscription of messages using MQTT v3.1.1 and v5.0 protocols, catering to Internet of Things (IoT) solutions. Through HTTP, Event Grid enables the development of event-driven solutions where a publisher service notifies subscriber applications about changes in its system state (events). Event Grid offers two delivery methods: push delivery, where events are sent to subscribers based on configuration, and pull delivery, where subscribers connect to Event Grid to retrieve events. Additionally, Event Grid supports the CloudEvents 1.0 specification, ensuring interoperability across various systems.
Let's see how we can reuse this pattern in the actual example. I wrote the components and services of the solution in CSharp. As a primary Event-Bus, I used the Azure Service Bus.
I designed the solution to be easy to extend to create providers for Apache Kafka and Rabbit MQ.
The first component is ServiceBusManager. It contains a method to subscribe to the messages. You can see the code below:
public async Task RegisterOnReceiveMessages(string subscription, Dictionary<string, Func<Message, bool>> subscriptionToLabelHandler, CancellationToken cancellationToken)
{
var taskCompletionSource = new TaskCompletionSource<bool>();
SubscriptionClient subscriptionClient = GetSubscriptionClient(subscription);
RegisterCancellationToken(cancellationToken, subscriptionClient, taskCompletionSource);
var messageHandlerOptions = MessageHandlerOptions;
// Register the function that will process messages
subscriptionClient.RegisterMessageHandler(async (message, token) =>
{
//Process the message
Console.WriteLine($"Received message: SequenceNumber:{message.Label} | SequenceNumber:{message.SystemProperties.SequenceNumber} | Body:{Encoding.UTF8.GetString(message.Body)}");
subscriptionToLabelHandler[message.Label](message);
// Complete the message so that it is not received again.
await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}, messageHandlerOptions);
await taskCompletionSource.Task;
}
And the method that allows another component to send messages.
public async Task SendMessage(string label, string messageContent)
{
try
{
var topicClient = new TopicClient(serviceBusSettings.ConnectionString, serviceBusSettings.TopicName);
var messageData = GetMessageContent(label, messageContent);
var message = new Message
{
Body = messageData,
Label = label,
};
// Send the message to the queue
await topicClient.SendAsync(message);
await topicClient.CloseAsync();
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
}
}
EventAggregator
The second component of our system is the EventAggregator
class. It contains:
Configuration
: maps event handler label and event handler function.
private Dictionary<string, Func<Message, bool>> SubscriptionToLabelFuncs => new Dictionary<string, Func<Message, bool>>
{
{ "First", DoFirstHandler },
{ "Second", DoSecondHandler }
};
....
public bool DoFirstHandler(Message message)
{
// Get message body example
var data = GetMessageBody(message);
return true;
}
/// <summary>
/// Second message handler example.
/// </summary>
/// <param name="message">The message.</param>
/// <returns></returns>
public bool DoSecondHandler(Message message)
{
// Get message body example
var data = GetMessageBody(message);
return true;
}
And the method that runs the aggregation process in the background.
/// <summary>
/// Starts the agregating.
/// </summary>
/// <returns></returns>
public async Task StartAgregating()
{
this.cancellationToken = new CancellationTokenSource().Token;
await serviceBusManager.RegisterOnReceiveMessages(Subscription, SubscriptionToLabelFuncs, cancellationToken);
}
Here you can find a complete implementation of the Service Bus Manager.
Conclusion
In this article, I've explained one way to organize event communication in your microservice solution. Suppose you want to receive advanced skills in building event-driven architecture. In that case, you can subscribe and receive Architecture Digest and enroll in the course Building Event-Driven and Microservices Architecture in Azure.
This article explored different event aggregation patterns, including the Event Bus Pattern, Event-driven Microservices Pattern, and CQRS Pattern. These patterns offer powerful ways to facilitate communication and coordination between microservices, ensuring loose coupling and scalability.
Opinions expressed by DZone contributors are their own.
Comments