Spring Cloud Stream Channel Interceptor
A Channel Interceptor is used to capture a message before being sent or received in order to view or modify it. Learn how a channel interceptor works and how to use it.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
A Channel Interceptor is a means to capture a message before being sent or received in order to view it or modify it. The channel interceptor allows having a structured code when we want to add extra message processing or embed additional data that are basically related to a technical aspect without affecting the business code.
The Message Interceptor is used in frameworks like Spring Cloud Sleuth and Spring Security to propagate tracing and security context through message queue by adding headers to message in the producer part, then reading them and restoring the context in the consumer part.
The message interceptor plays a similar role like a Servlet filter or an Aspect that can be added in a transparent way to both message producer and consumer as the following diagram shows:
- When the output channel sends a new message, the message goes through the channel interceptors before being delivered to the broker.
- When a new message received by the consumer, the message goes through the input channel interceptors before being handled by the message listener.
How It Works
This part explains roughly how Spring Cloud Stream works when sending and receiving a message. The descriptions below do not include the binders of specific broker implementations such as Kafka or RabbitMQ.
Bellow the part of ChannelInterceptor
interface source code that interests us:
x
public interface ChannelInterceptor {
/**
* Invoked before the Message is actually sent to the channel.
* This allows for modification of the Message if necessary.
* If this method returns {@code null} then the actual
* send invocation will not occur.
*/
default Message<?> preSend(Message<?> message, MessageChannel channel) {
return message;
}
/**
* Invoked immediately after the send invocation. The boolean
* value argument represents the return value of that invocation.
*/
default void postSend(Message<?> message, MessageChannel channel, boolean sent) {
}
/**
* Invoked after the completion of a send regardless of any exception that
* have been raised thus allowing for proper resource cleanup.
* <p>Note that this will be invoked only if {@link #preSend} successfully
* completed and returned a Message, i.e. it did not return {@code null}.
* @since 4.1
*/
default void afterSendCompletion(
Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
}
...
}
Sending Message
- When sending a message, the
MessageChannel
calls the methodMessageConverter.fromMassage()
in order to perform message conversion. - The
MessageChannel
calls the methodChannelInterceptor.preSend()
using the message object returned from step 1. This method returns also the message object. - The
MessageChannel
call the broker API implementation in order to send the returned message from step 2. - The
MessageChannel
calls the methodsChannelInterceptor.postSend()
. - Finally, The
MessageChannel
calls the methodChannelInterceptor.afterSendCompletion()
with the exception type if thrown during the message sending.
Receiving Message
The message reception follows similar processing as the message sending when using the annotation @StreamListener
instead of pulling messages. When a new message received, the SubscribableChannel calls also method ChannelInterceptor.preSend()
.
Keep in mind that ChannelInterceptor.preReceive()
is invoked only when using PollableChannel
and not StreamListener
.
Create Your Own Channel Interceptor
In this example, we will create two global channel interceptors for both producer and consumer. The producer interceptor will add a new header to the original message and the received interceptor will take it and log it. This example can be applied to Spring Security context, MDC or any other information you want to propagate through a broker.
Project Preparation
If you are familiar with Spring Cloud Stream you can build your own test project, if not you can follow the steps described in this post Spring Cloud Stream With Kafka.
Output Channel Interceptor
Create a simple class that implements ChannelInterceptor
. This class override the method preSend()
by adding a new header to the message:
xxxxxxxxxx
public class OutputChannelInterceptor implements ChannelInterceptor {
public Message<?> preSend(Message<?> message, MessageChannel channel) {
MessageHeaderAccessor mutableAccessor = MessageHeaderAccessor.getMutableAccessor(message);
// Adding the message creation time header
mutableAccessor.setHeader("Creation-Time", System.currentTimeMillis());
return new GenericMessage<>(message.getPayload(), mutableAccessor.getMessageHeaders());
}
}
Input Channel Interceptor
Create a simple class that implements ChannelInterceptor
. This class overrides the method preSend()
by retrieving the header injected in the previous class and logging it.
xxxxxxxxxx
// lombok annotation for log
public class InputChannelInterceptor implements ChannelInterceptor {
public Message<?> preSend(Message<?> message, MessageChannel channel) {
MessageHeaders messageHeaders=message.getHeaders();
if(messageHeaders.containsKey("Creation-Time")){ // Checking that the header exists
// Retreiving and logging the message creation time value
Long creationTime= (Long) messageHeaders.get("Creation-Time");
log.debug("The message creation time is: {}",creationTime);
}
return message;
}
}
Interceptors Configuration
The interceptor configuration can be done in two ways.
Using Global Channel Interceptor
The first way consists of adding the annotation @GlobalChannelIterceptor
to the input and output interceptors by specifying the pattern for each one.
All the input channel names should have the same prefix or suffix, the same for output ones.
xxxxxxxxxx
pattern={"*-out"})// output channel name is XXX-out (
public class OutputChannelInterceptor implements ChannelInterceptor {
....
}
xxxxxxxxxx
pattern={"*-in"})// input channel name is XXX-in (
public class InputChannelInterceptor implements ChannelInterceptor {
....
}
Using Bean Post Processor
The second way consists of creating a BeanPostProcessor
class that injects the channel interceptors in each Message Channel according to its type.
Creates a configuration class that injects the interceptors in all message channels.
x
public class MessagingConfiguration {
public BeanPostProcessor channelsConfigurer(
inputChannelInterceptor inputChannelInterceptor,
OutputChannelInterceptor outputChannelInterceptor) {
return new BeanPostProcessor() {
public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AbstractMessageChannel) {
AbstractMessageChannel messageChannel = (AbstractMessageChannel) bean;
if (messageChannel.getAttribute("type").equals("input")) {
//The current bean is an input message channel
messageChannel.addInterceptor(inputChannelInterceptor);
} else {
//The current bean is an output message channel
messageChannel.addInterceptor(outputChannelInterceptor);
}
}
return bean;
}
};
}
}
Opinions expressed by DZone contributors are their own.
Comments