Unlocking the Power of Postgres Listen/Notify: Building a Scalable Solution With Spring Boot Integration
Unleash the potential of Postgres Listen/Notify paired with Spring Boot for streamlined and scalable real-time communication.
Join the DZone community and get the full member experience.
Join For FreeIn this article, we will examine Postgres' "Listen/Notify" functionality and try to answer the following simple questions:
- Why leverage Postgres as a message broker?
- When is it most beneficial to do so?
- How can you seamlessly integrate it with Spring Boot?
This article dives into strategies for building scalable solutions, prioritizing not just the efficiency of your system but also the integrity of your data. Serving as a fundamental guide, it aims to provide a comprehensive understanding of Postgres Listen/Notify, encouraging you to implement your distinctive use cases.
When and Why To Use Postgres as a Message Broker
Before diving further into the subject, let's establish some foundational clarifications.
While Postgres LISTEN/NOTIFY feature is powerful for real-time communication within a database, it's important to note that it doesn't serve as a direct alternative or replacement for comprehensive queue solutions like RabbitMQ, Apache Kafka, IBM MQ, Redis Pub/Sub, Azure Event Hub and others.
Unlike dedicated message queue systems, LISTEN/NOTIFY is specifically tailored to database-centric scenarios and may lack certain advanced features such as distributed data storage, fault tolerance, and the ability to handle massive-scale data streaming.
So, Why Use It?
- Listen/Notify is already part of the Postgres database by default.
- Setting it up is simpler compared to configuring and managing a separate Apache Kafka cluster, for example.
- Listen/Notify in Postgres reduces application overhead by eliminating the need for an additional layer of complexity and external dependency.
- Reduced Infrastructure Complexity and ensures better resource efficiency, Listen/Notify eliminates the need for an additional messaging infrastructure, making it a more straightforward solution for scenarios where simplicity and minimal setup are priorities.
And When To Use It
Listen/Notify works well for handling tasks when all the necessary information is already in the database. It sends messages to background processes, letting them know there's something new to work on.
These are some basic use cases when listening/notifying might be a wise choice.
- When there is a need to send real-time notifications upon specific database events.
- When background processes need to be triggered immediately upon changes in the database.
- When building event-driven architectures.
- When building a scalable publish/subscribe mechanism where multiple components can subscribe to specific events.
- When aiming to simplify your architecture and reduce dependencies by handling messaging directly within PostgreSQL instead of relying on external message brokers.
How To Get the Most of It and Spring Boot Implementation Example
To get most of this feature, we will need to use it in combination with some other nice features that are offered by Postgres.
Scenario Description
Let's set up a simple real-life scenario where we want to get advantages of the listen/notify feature.
In this use case, we aim to provide a straightforward REST endpoint, allowing clients to seamlessly dispatch SMS messages to customers.
The endpoint accepts a list of JSON objects, each containing essential details such as the phone number and the message content.
Upon a successful client request to this endpoint, our goal is to efficiently submit all these objects into a queue and initiate a background process. This process involves invoking an additional SMS API responsible for managing the delivery of the SMS messages.
How To Handle This Use Case
As it's already mentioned, to get most of the listen/notify feature, let's use a combination of some other Postgres features to do this implementation properly.
Database
To begin, let's create a straightforward internal queue table. This table will serve as the repository for all SMS messages awaiting processing and transmission by our Spring Boot application.
CREATE TABLE NOTIFICATIONS_QUEUE
(
ID BIGSERIAL PRIMARY KEY,
PHONE_NUMBER VARCHAR(255),
CONTENT VARCHAR(255),
STATUS VARCHAR(255),
CREATED_AT TIMESTAMP NOT NULL DEFAULT NOW(),
);
Now, let's create a Postgres trigger function.
CREATE OR REPLACE FUNCTION notify_new_notification_in_queue()
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
PERFORM pg_notify('notifications', NEW.id::text);
ELSIF TG_OP = 'UPDATE' THEN
PERFORM pg_notify('notifications', NEW.id::text);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER notification_added_in_queue
AFTER INSERT OR UPDATE ON notifications_queue
FOR EACH ROW
EXECUTE FUNCTION notify_new_notification_in_queue();
This code creates a setup in Postgres to alert when new records are added or updated in a NOTIFICATIONS_QUEUE
table.
Simply put, whenever a new record is added, or an existing one is updated in this table, it triggers a notification. This notification is sent via pg_notify
method, and it will contain a newly generated database record ID.
With this, we are done with database changes, and let's jump into the Spring boot code.
Spring Boot Implementation
Spring boot implementation is quite straightforward.
First, let's create a NotificationListenerConfiguration
class.
@Configuration
@Slf4j
@RequiredArgsConstructor
public class NotificationsListenerConfiguration {
private final NotificationListener notificationListener;
@Bean
CommandLineRunner startListener(NotificationHandler handler) {
return (args) -> {
log.info("Starting to watch for new notifications in the queue...");
notificationListener.listenForNotifications(handler);
};
}
}
And let's follow up with actual NotificationListener
class
@Component
@Slf4j
@RequiredArgsConstructor
public class NotificationListener {
@Value("${notificationListenerTimeout:10000}")
private int listenerTimeout;
private static final String NOTIFICATIONS_CHANNEL = "notifications";
private final DataSource dataSource;
private final NotificationQueueDatabaseService queueDatabaseService;
@Async
public void listenForNotifications(Consumer<PGNotification> consumer) {
while (true) {
try (Connection c = dataSource.getConnection()) {
PGConnection pgconn = c.unwrap(PGConnection.class);
c.createStatement().execute("LISTEN " + NOTIFICATIONS_CHANNEL);
log.info("Connection established: Listening for notifications on channel: [{}]", NOTIFICATIONS_CHANNEL);
queueDatabaseService.notifyAboutRemainingNotificationsInQueue();
log.info("Notified about hanging notifications in the queue...");
while (true) {
PGNotification[] nts = pgconn.getNotifications(listenerTimeout);
if (nts == null) {
continue;
}
for (PGNotification nt : nts) {
consumer.accept(nt);
}
}
} catch (Exception e) {
log.warn("Error occurred while listening for notifications, attempting to reconnect...", e);
}
}
}
}
This code defines a NotificationListener
class that asynchronously listens for notifications on a Postgres channel named "notifications
". If an error occurs while listening for notifications, it logs a warning, attempts to reconnect, and continues listening.
Now, we need just one more class to get this up and running.
@Component
@Slf4j
@RequiredArgsConstructor
public class NotificationHandler implements Consumer<PGNotification>{
private final NotificationQueueDatabaseService queueDatabaseService;
private final NotificationService notificationService;
@Override
@Transactional
public void accept(PGNotification pgNotification) {
log.info("Notification with id: [{}] received...", pgNotification.getParameter());
Long notificationId = getNotificationIdFromPgNotificationParameter(pgNotification.getParameter());
if (notificationId == null) {
return;
}
Boolean lockObtained = queueDatabaseService.obtainLockForNotification(notificationId);
if (!lockObtained) {
log.info("Notification with id: [{}] is already being processed...", pgNotification.getParameter());
return;
}
DBNotificationQueue notification = null;
try {
notification = queueDatabaseService.findUnlockedNotificationById(notificationId);
} catch (Exception exception) {
//handle some exception
}
try {
notificationService.sendNotification(notification);
} catch (Exception exception) {
//handle send notification exception
} finally {
//delete notification from the queue or update status
}
}
private Long getNotificationIdFromPgNotificationParameter(String pgNotificationParameter) {
try {
return Long.parseLong(pgNotificationParameter);
} catch (Exception exception) {
log.error("Error occurred while parsing notification id from pgNotificationParameter : [{}]", pgNotificationParameter, exception);
return null;
}
}
}
Please note that this code is intentionally simplified to provide you with a basic understanding of how a Notification Handler should be structured. In essence, whenever a new record is added or updated in the NOTIFICATIONS_QUEUE
table, this method captures the change and initiates the processing sequence.
How To Scale Properly
To enhance the scalability of this code, we've used a built-in feature in Postgres known as advisory lock
.
Examining the database method queueDatabaseService.obtainLockForNotification(notificationId)
, you'll find it structured like this:
@Query(value = "SELECT pg_try_advisory_xact_lock(?1)", nativeQuery = true)
Boolean obtainLockForNotification(Long id);
This Postgres SQL statement is attempting to acquire an advisory lock for a transaction using the value provided in the method.
Advisory Lock
An advisory lock is a mechanism in PostgreSQL that allows applications to coordinate and synchronize activities, providing a way to control access to shared resources. In simpler terms, this statement is trying to obtain a specific type of lock to manage concurrent access to certain operations in a transaction.
Combined with the @Transactional
annotation on the accept
method, this configuration guarantees that a single notification is processed exclusively on one instance of the application. This precaution is crucial; without this code snippet, there is a risk of sending duplicate notifications to customers, as multiple instances might attempt to handle the same notification concurrently.
Conclusion
This article highlights the simplicity and effectiveness of Postgres "Listen/Notify" for real-time database communication, demonstrated through a practical Spring Boot example. I hope this encourages developers to consider and implement this approach in their projects for streamlined solutions.
Opinions expressed by DZone contributors are their own.
Comments