Cloud-Events Kafka Binding With Spring
Utilize Kafka Protocol Binding for CloudEvents along with Spring-Kafka to easily produce and consume events in a common format.
Join the DZone community and get the full member experience.
Join For FreeThis article aims to provide details on how to use the spring-kafka library with the CloudEvents spec in order to quickly get you producing and consuming events with a common structure by way of Kafka. The spring-kafka project provides utilities and templates to interact with Kafka with minimal effort, while the CloudEvents specification describes event data in a common way and provides both a Java-SDK as well as a library for Kafka protocol binding.
The CloudEvents Kafka protocol binding provides two content modes for transferring data.
Structured: event metadata attributes and event data are placed into the Kafka message value section using an event format.
Binary: the value of the event data MUST be placed into the Kafka message's value section as-is, with the content-type header value declaring its media type; all other event attributes MUST be mapped to the Kafka message's header section.
For more details see: https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md#13-content-modes
Dependencies
In this tutorial we'll be using spring-kafka 2.5.5.RELEASE and cloudevents-kafka 2.0.0-milestone3. As of this writing, version 2 of the cloud events library hasn't been released, so the latest milestone is used.
xxxxxxxxxx
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.5.RELEASE</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
<version>2.0.0-milestone3</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>2.0.0-milestone3</version>
</dependency>
Binary Producer and Consumer
In order to publish and consumer messages from Kafka, we'll need to create a Producer and Consumer that can support serializing and deserializing Cloud Events.
Producer
The first thing you need is a producer. We can do this by creating a class to help build our Cloud Event supporting producer:
xxxxxxxxxx
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import io.cloudevents.kafka.CloudEventSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import java.util.HashMap;
import java.util.Map;
import static io.cloudevents.kafka.CloudEventSerializer.ENCODING_CONFIG;
import static io.cloudevents.kafka.CloudEventSerializer.EVENT_FORMAT_CONFIG;
public class CloudEventKafkaProducerFactory {
public static <K, V> DefaultKafkaProducerFactory<K, V> cloudEventKafkaProducerFactory(
Map<String, Object> configs, Serializer<K> keySerializer, Encoding encoding) {
return cloudEventKafkaProducerFactory(configs, keySerializer, encoding, JsonFormat.CONTENT_TYPE);
}
"unchecked") (
public static <K, V> DefaultKafkaProducerFactory<K, V> cloudEventKafkaProducerFactory(
Map<String, Object> configs,
Serializer<K> keySerializer,
Encoding encoding,
String eventFormat) {
//If present, the Kafka message header property content-type MUST be set to the media type of an event format.
if(Encoding.STRUCTURED.equals(encoding)) {
EventFormat resolveFormat = EventFormatProvider
.getInstance()
.resolveFormat(eventFormat);
if(resolveFormat == null) {
eventFormat = JsonFormat.CONTENT_TYPE;
}
}
Map<String, Object> ceSerializerConfigs = new HashMap<>();
ceSerializerConfigs.put(ENCODING_CONFIG, encoding);
ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, eventFormat);
CloudEventSerializer cloudEventSerializer =
new CloudEventSerializer();
//isKey always false
cloudEventSerializer.configure(ceSerializerConfigs, false);
return new DefaultKafkaProducerFactory<>(configs, keySerializer, (Serializer<V>) cloudEventSerializer);
}
}
This class will allow us to create a DefaultKafkaProducerFactory that supports either binary or structured encoding. You can also specify the event format depending on your needs.
You can instantiate a ProducerFactory to serialize your data into a structured cloud event using the CloudEventKafkaProducerFactory. You simply need to provide it with your producer configs, the key serializer, and the encoding type, in this case, Encoding.BINARY.
DefaultKafkaProducerFactory<String, CloudEvent> binaryProducerFactory = CloudEventKafkaProducerFactory.cloudEventKafkaProducerFactory(producerConfigs, new StringSerializer(), Encoding.BINARY);
Consumer
We'll also need to create a class to help us construct a consumer that supports Cloud Events:
xxxxxxxxxx
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import io.cloudevents.kafka.CloudEventDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
import static io.cloudevents.kafka.CloudEventSerializer.ENCODING_CONFIG;
import static io.cloudevents.kafka.CloudEventSerializer.EVENT_FORMAT_CONFIG;
public class CloudEventKafkaConsumerFactory {
public static <K, V> DefaultKafkaConsumerFactory<K, V> consumerFactory(
Map<String, Object> configs,
Deserializer<K> keySerializer,
Encoding encoding) {
return consumerFactory(configs,
keySerializer,
encoding,
JsonFormat.CONTENT_TYPE);
}
"unchecked") (
public static <K, V> DefaultKafkaConsumerFactory<K, V> consumerFactory(
Map<String, Object> configs,
Deserializer<K> keySerializer,
Encoding encoding,
String eventFormat) {
//If present, the Kafka message header property content-type MUST be set to the media type of an event format.
if(Encoding.STRUCTURED.equals(encoding)) {
EventFormat resolveFormat = EventFormatProvider
.getInstance()
.resolveFormat(eventFormat);
if(resolveFormat == null) {
eventFormat = JsonFormat.CONTENT_TYPE;
}
}
Map<String, Object> ceDeserializerConfigs = new HashMap<>();
ceDeserializerConfigs.put(ENCODING_CONFIG, encoding);
ceDeserializerConfigs.put(EVENT_FORMAT_CONFIG, eventFormat);
CloudEventDeserializer cloudEventDeserializer =
new CloudEventDeserializer();
//isKey always false
cloudEventDeserializer.configure(ceDeserializerConfigs, false);
return new DefaultKafkaConsumerFactory<>(configs,
keySerializer,
(Deserializer<V>) cloudEventDeserializer);
}
}
This class will allow us to create a DefaultKafkaConsumerFactory that supports either binary or structured encoding which will properly deserialize your Kafka message to a CloudEvent object.
The process of creating a consumer is similar to that of the producer.
xxxxxxxxxx
ConsumerFactory<String, CloudEvent> binaryConsumerFactory = CloudEventKafkaConsumerFactory.consumerFactory(
consumerConfigs,
new StringDeserializer(),
Encoding.BINARY);
Structured Producer and Consumer
Producer
You can create a ProducerFactor to serialize your data into a structured cloud event using the CloudEventKafkaProducerFactory. You simply need to provide it with your producer configs, the key serializer, and the encoding type, in this case, Encoding.STRUCTURED.
xxxxxxxxxx
DefaultKafkaProducerFactory<String, CloudEvent> structuredProducerFactory = CloudEventKafkaProducerFactory.cloudEventKafkaProducerFactory(
producerConfigs,
new StringSerializer(),
Encoding.STRUCTURED);
Consumer
The process of creating a consumer is similar to that of the producer.
xxxxxxxxxx
ConsumerFactory<String, CloudEvent> structuredConsumerFactory = CloudEventKafkaConsumerFactory.consumerFactory(
consumerConfigs,
new StringDeserializer(),
Encoding.STRUCTURED);
Example Usage
xxxxxxxxxx
DefaultKafkaProducerFactory<String, CloudEvent> binaryProducerFactory = CloudEventKafkaProducerFactory.cloudEventKafkaProducerFactory(
producerProps, new StringSerializer(), Encoding.BINARY);
KafkaTemplate<String, CloudEvent> binaryKafkaTemplate =
new KafkaTemplate<>(binaryProducerFactory);
ConsumerFactory<String, CloudEvent> binaryConsumerFactory = CloudEventKafkaConsumerFactory.consumerFactory(
consumerProps, new StringDeserializer(), Encoding.BINARY);
Consumer<String, CloudEvent> binaryConsumer = binaryConsumerFactory.createConsumer();
With this KafkaTemplate and Consumer, you'll be able to publish and consume CloudEvents with binary encoding. Here is a unit test illustrating this usage:
xxxxxxxxxx
public void testCloudEventsBinaryMessage() throws Exception {
binaryKafkaTemplate.send(TOPIC, getCloudEvent(person));
ConsumerRecord<String, CloudEvent> consumerRecord = KafkaTestUtils
.getSingleRecord(binaryConsumer, TOPIC);
CloudEvent consumedEvent = consumerRecord.value();
Person payload = objectMapper
.readValue(consumedEvent.getData(),Person.class);
assertThat(consumedEvent.getId()).isEqualTo(cloudEventID);
assertThat(consumedEvent.getType()).isEqualTo(cloudEventType);
assertThat(consumedEvent.getSource()).isEqualTo(cloudEventSource);
assertThat(payload).isEqualTo(person);
KafkaUtils.clearConsumerGroupId();
}
To view the source code for these examples, please see my GitHub repository here:
https://github.com/wkennedy/spring-kafka-cloud-events
For additional information regarding these libraries, see:
https://github.com/cloudevents
https://github.com/cloudevents/spec/blob/master/kafka-protocol-binding.md
Opinions expressed by DZone contributors are their own.
Comments