Testing Schema Registry: Spring Boot and Apache Kafka With JSON Schema
Learn how to write tests for the Schema Registry for Apache Kafka using Spring Boot, MockSchemaRegistryClient, and EmbeddedKafka with JSON Schema.
Join the DZone community and get the full member experience.
Join For FreeIntroduction
Apache Kafka is the most widely used reliable and scalable eventing platform. It helps in providing the base for a fast, scalable, fault-tolerant, event-driven microservice architecture. Any type of data can be published into a Kafka topic and can be read from. Kafka does not provide an out-of-the-box schema validation system and hence is prone to junk data being fed into a topic.
Confluent has devised a concept of a schema registry that can be used to implement type checking of messages in any Kafka installation. The Schema Registry needs the message schema to be registered against a topic and it enforces that only messages conforming to the schema are sent to the topic.
Integrating Schema Registry is not so much of a challenge with a Spring Boot Kafka producer and consumer apps with extensive support from Spring Boot and Apache Kafka. However, when it comes to testing, this often poses some challenges to the developer.
Since JSON is the most widespread data format, in this post I describe how to write tests for the Schema Registry for Apache Kafka using Spring Boot, MockSchemaRegistryClient
, and EmbeddedKafka
using JSON Schema.
Install Dependencies
Add the confluent kafka-json-schema-serializer
from Confluent dependency in pom.xml.
<project>
...
<repositories>
<repository>
<id>confluent</id>
<name>confluent</name>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>6.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
...
</project>
Setting Up the Test Infrastructure
To test the schema registry, we won't require a full-fledged production instance of Kafka. Rather, we will use an embedded version of Kafka. To set up Embedded Kafka, we need to annotate any @SpringBootTest
class or any @TestConfiguration
class with @EmbeddedKafka
providing required parameters as shown below:
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:8092", "port=8092"})
@ActiveProfiles("test")
@DirtiesContext
public class KafkaTestSupport {
}
io.confluent:kafka-json-schema-serializer
provides a mock implementation of Schema Registry client called MockSchemaRegistryClient
that can be used to register and test out JSON schema. The io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
should be used as a value serializer for JSON schema-based events.
To set up a mock schema-registry server, the producer properties should be set as follows in application.yaml in your test resources.
spring:
kafka:
bootstrap-servers: localhost:8092 # should match with embedded broker host port
producer:
value-serializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
properties:
schema.registry.url: mock://not-used
auto.register.schemas: false
use.latest.version: true
json:
fail.invalid.schema: true
oneof.for.nullables: false
write.dates.iso8601: true
A couple of points to note here:
- The mock Schema Registry URL must start with
mock://
. - We are going to set auto.register.schema to false, since we will be populating and registering our JSON schema with the mock registry server beforehand. Setting this to false will disallow auto-creation and registration of the schema using Jackson.
Let's consider our event model SampleEvent
as follows:
public class SampleEvent {
@JsonProperty(required = true)
private Long id;
private String name;
@JsonProperty(required = true)
private OffsetDateTime date;
@JsonProperty(required = true)
private List<SampleSubEvent> subEvents;
}
public class SampleSubEvent {
@JsonProperty(required = true)
private Long id;
private String name;
}
Writing the Test Cases
As part of the configuration, we need to provide a SchemaRegistryClient
bean of type MockSchemaRegistryClient
and register the serializer to use this mock schema registry instead of an actual one. The ProducerFactory
should also be registered to use this serializer. We will do these using a test configuration static inner class.
class SchemaRegistryTest extends KafkaTestSupport {
@TestConfiguration
public static class TestConfig<K, V> {
@Autowired
KafkaProperties properties;
@Bean
public SchemaRegistryClient schemaRegistryClient() throws RestClientException, IOException {
SchemaProvider provider = new JsonSchemaProvider();
return new MockSchemaRegistryClient(Collections.singletonList(provider));
}
@Bean
public KafkaJsonSchemaSerializer<V> kafkaJsonSchemaSerializer(SchemaRegistryClient schemaRegistryClient)
throws RestClientException, IOException {
return new KafkaJsonSchemaSerializer<V>(schemaRegistryClient);
}
@Bean
public ProducerFactory<K, V> producerFactory(Serializer<V> vSerializer) {
return new DefaultKafkaProducerFactory<K, V>(properties.buildProducerProperties(), (Serializer<K>) new StringSerializer(),
vSerializer);
}
}
}
Once our mock Schema Registry has been set up, the next step is to generate a valid schema for the event model and register the schema with the mock Schema Registry. Following the default subject naming strategy, for a topic called test
, the corresponding subject name will be test-value
. For more details on the topic and subjects and how the subject naming strategies work, I would suggest you read this documentation from Confluent.
class SchemaRegistryTest extends KafkaTestSupport {
private static final String TOPIC = "test-";
private static final String SUBJECT = "test-value";
private static final String SCHEMA_STR = "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"title\":\"Sample Event\",\"type\":\"object\",\"additionalProperties\":true,\"properties\":{\"id\":{\"type\":\"integer\"},\"name\":{\"type\":\"string\"},\"date\":{\"type\":\"number\"},\"subEvents\":{\"type\":\"array\",\"items\":{\"$ref\":\"#/definitions/SampleSubEvent\"}}},\"required\":[\"id\",\"date\",\"subEvents\"],\"definitions\":{\"SampleSubEvent\":{\"type\":\"object\",\"additionalProperties\":true,\"properties\":{\"id\":{\"type\":\"integer\"},\"name\":{\"type\":\"string\"}},\"required\":[\"id\"]}}}";
private static final String SOME_INVALID_SCHEMA_STRING = "some_junk";
@Autowired
private KafkaTemplate<String, SampleEvent> template;
@Autowired
private SchemaRegistryClient schemaRegistryClient;
@Test
void should_publish_successfully_on_valid_schema() throws RestClientException, IOException {
// parse and register the schema
Optional<ParsedSchema> parsedSchema = schemaRegistryClient.parseSchema("JSON",SCHEMA_STR,
Collections.EMPTY_LIST);
if(parsedSchema.isPresent()){
ParsedSchema schema = parsedSchema.get();
schemaRegistryClient.register(SUBJECT, schema,1,1);
}
SampleEvent e = new SampleEvent();
// send and assert that no exceptions are thrown
Assertions.assertDoesNotThrow(() -> template.send(TEST_TOPIC, e));
}
@TestConfiguration
public static class TestConfig<K, V> {
...
}
}
Similarly, to test that an exception is thrown, register a wrong schema or try publishing a different event.
@Test
void should_throw_error_on_invalid_schema() throws RestClientException, IOException {
Optional<ParsedSchema> parsedSchema = schemaRegistryClient.parseSchema("JSON", SOME_INVALID_SCHEMA_STRING,
Collections.EMPTY_LIST);
if(parsedSchema.isPresent()){
ParsedSchema schema = parsedSchema.get();
schemaRegistryClient.register(SUBJECT, schema,1,1);
}
SampleEvent e = new SampleEvent();
// send and assert that exceptions are thrown
Assertions.assertThrows(Exception.class,
() -> template.send(TEST_TOPIC, e));
}
And voila!!!
I hope that this article will be a good and helpful resource on testing Kafka Schema Registry implementation involving JSON schema.
Opinions expressed by DZone contributors are their own.
Comments