Kafka JDBC Source Connector for Large Data
In this article we are going to look at how we can use Kafka’s JDBC Source Connector to query a large table/view in batches both for new records and updated ones and publish them to a topic using Avro Schema.
Join the DZone community and get the full member experience.
Join For FreeRecently I had a task to migrate some data from an old monolith’s Oracle database to a microservice with a PostgreSQL database. The problem was that the data needed for migration had a parent table with around 2 million records with 150 columns and on top of that, everything was brought into view with a payload column aggregating data from various tables in XML. As you can imagine, the SELECT from that view was pretty slow, and by pretty, I mean insanely slow which was not going to work very well for the connector. So, in this article we’ll take a look at a similar simplified use case and how can we deal with it.
Use Case
We have a course-catalogue application with a PostgreSQL database that deals with instructors and their courses. Now we need to migrate some legacy instructors from another PostgreSQL database that soon is going to be decommissioned. So we have instructors-legacy-db
and the course-catalog-db
. In our case, both databases won’t be that overwhelmed with records, with just about 200 records for the instructors-legacy-db
, but for the sake of the example, just imagine that instructors-legacy-db
is that table with 2 million cumbersome records.
Right, here is the docker-compose.yml
version: '3'
services:
course-catalog-operational-db:
image: postgres:13.3
container_name: course-catalog-operational-db
command:
- "postgres"
- "-c"
- "wal_level=logical"
environment:
POSTGRES_PASSWORD: 123456
POSTGRES_DB: course-catalog-db
ports:
- "5433:5432"
instructors-legacy-db:
image: postgres:13.3
container_name: instructors-legacy-db
command:
- "postgres"
- "-c"
- "wal_level=logical"
environment:
POSTGRES_PASSWORD: 123456
POSTGRES_DB: instructors-db
ports:
- "5434:5432"
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
Here is the course-catalog-db/instructors
table:
create table instructors
(
id integer not null
primary key,
created_at timestamp not null,
updated_at timestamp not null,
description varchar(3000),
name varchar(255),
summary varchar(3000)
);
And here is the instructors-legacy-db/instructors
table:
create table instructors
(
id integer not null
primary key,
created_at timestamp not null,
updated_at timestamp not null,
first_name varchar(255),
last_name varchar(255),
title varchar(255)
);
Also if you’ve noticed for the instructors-legacy-db
container I’m using a init.sql
script to create the table and do some inserts on the startup, so we’ll have some data to play around with. There is nothing special in that script, just 200 randomly generated inserts, here are some of them (the rest of them can be viewed on the repo) :
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (0, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Liam', 'Martinez', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (1, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Thomas', 'Williams', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (2, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Mateo', 'Martinez', 'Mr.');
INSERT INTO public.instructors (id, created_at, updated_at, first_name, last_name, title) VALUES (3, '2022-08-28 21:00:19.315552', '2022-08-28 21:00:19.315552', 'Ciro', 'Smith', 'Mr.');
And here is the view that we are going to address:
create view instructor_aggregate_vw(id, created_at, updated_at, name) as
SELECT instructors.id,
instructors.created_at,
instructors.updated_at,
(((instructors.title::text || ' '::text) || instructors.first_name::text) || ' '::text) ||
instructors.last_name::text AS name
FROM instructors;
Okay, not that we have everything in place, how do we get our data queried and published?
Kafka Connect in Action
That’s right, we are going to use io.confluent.connect.jdbc.JdbcSourceConnector
for that.
The Kafka Connect JDBC Source connector allows you to import data from any relational database with a JDBC driver into an Apache Kafka® topic. This connector can support a wide variety of databases.
But first, we need to set up our Kafka Environment. For this, I am going to use the landoop/fast-data-dev
’s docker image, since it comes with almost everything properly configured starting from the zookeeper, schema registry, kafka-connect
and the broker and ending with a nice UI provided by Landoop for managing everything Kafka-related. Here is what we are going to add to our docker-compose.yml
course-catalog-kafka-cluster:
container_name: course-catalog-kafka-cluster
image: landoop/fast-data-dev
environment:
ADV_HOST: 127.0.0.1
RUNTESTS: 0
ports:
- 2181:2181
- 3030:3030
- 8081-8083:8081-8083
- 9581-9585:9581-9585
- 9092:9092
volumes:
# Specify an absolute path mapping
- C:\Users\ionpa\Projects\course-catalog\infra:/my-data
Okay, docker-compose up
, and our entire environment is up. You can go ahead and check the UI on http://localhost:3030/ and explore it a bit.
Now about the connector. Connectors can be added both through the UI on localhost here following a property-based or JSON-based configuration or by executing an HTTP request http://localhost:8083/connectors/ with the PUT method and a JSON body containing your connector’s config. Let’s take a look at the configuration that we are going to use for our connector:
name=legacy.instructors.connector
topic.prefix=legacy-instructors
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
table.types=VIEW
connection.url=jdbc:postgresql://instructors-legacy-db:5432/instructors-db
connection.user=postgres
connection.password=123456
connection.attempts=100
connection.backoff.ms=300000
poll.interval.ms=100
transforms=AddNamespace,createKey,AddKeyNamespace
transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate
transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
mode=timestamp+incrementing
timestamp.column.name=updated_at
timestamp.delay.interval.ms=3000
incrementing.column.name=id
numeric.mapping=best_fit
query=select * from(select * from instructor_aggregate_vw where "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) order by "updated_at","id" ASC) limit 50 --
First of all, hats off to Confluent, for doing such a great job of documenting everything. You can check the meaning of each property here. But, anyway, I am going to give a short description of what we have here:
name
– obviously, is the connector’s name.topic.prefix
– because we are using a custom query this is the name of the topic that we are going to publish our records.connector.class
– is the implementation of the connector that we are going to use, in our case it is theio.confluent.connect.jdbc.JdbcSourceConnector
table.types
– since we are going to query a custom view, the type is going to be VIEW.connection.*
- are connection-related properties, obviously the connection URL, user, password to our DB and also some configuration related to the number of attempts and back off.poll.interval.ms
– it is basically how frequently the connector should poll the table for new records.transforms.*
- in my case configuration properties related to converting/serializing both the payload and the key to AVRO.mode
- this is basically one of the most important properties, and it can have the following values:- bulk – queries the entire table every time
- incrementing – will detect new rows based on an id column
- timestamp – will detect new/updated rows based on an automatically updated timestamp column
- timestamp+incrementing – considered to be the most robust and accurate mode, since it combines 2 modes mentioned above, and having both the timestamp column and id column in place allows us to identify both the new and updated rows uniquely
timestamp.*
- defines the required column and delay in milliseconds for the timestamp mode, in our case update_atincrementing.column.name
– defines the required column for the incrementing mode, in our case idnumeric.mapping
– decides how are we going to treat, NUMERIC values, and in our case, it is best_fit which tells those numeric columns should be cast to INT or FLOAT based upon the column’s precision and scalequery
– this is the most essential property regarding to this article, so let us dive a bit deeper here.- Basically, this property defines the query that is going to be used to address a table/view, and for our case, this should be enough, right?
-
SQL
select * from instructor_aggregate_vw
Well not really, since we want this query to poll in batches, our query needs to be modified. Let’s say we want to query in batches of 50 records, this should be easily implemented with a simple
LIMIT
like this -
SQL
select * from instructor_aggregate_vw limit 50
Yes and no, yes that is the correct implementation, and this won’t work for the Kafka Connector. And it won’t work because the query should be in such a form that
WHERE
clause should be "appendable", and why is that? Because of our specified mode, the connector will append aWHERE
clause based on our defined timestamp/incrementing columns and the query, in the end, will look like this -
SQL
select * from instructor_aggregate_vw WHERE "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) ORDER BY "updated_at","id" ASC
And if we add the
LIMIT 50
, that will break, asWHERE
cannot be afterLIMIT
in PostgreSQL. So how do we deal with this?The trick is to take everything in our hands, comment on the generated
WHERE
clause by the connector, use a subquery to handle theWHERE
clause ourselves and perform theLIMIT
ing like this (pay attention to them --) -
SQL
select * from(select * from instructor_aggregate_vw where "updated_at" < ? AND (("updated_at" = ? AND "id" > ?) OR "updated_at" > ?) order by "updated_at","id" ASC) limit 50 --
And this should work for any database, for example in Oracle you’ll do a
WHERE rownum <= 50
.
This should be it, if you create this connector, you won’t see much since our view is very light on the data that it provides, and the streaming should happen really fast, but if you’re to use a heavy view/table you’ll notice how data appears in your queue in batches of 50 messages.
Consuming
If we are to go even further to consume this data and bring it in course-catalog-db/instructors
, it is a matter of defining a sink connector since we don’t do any transformation or use a simple consumer. In Spring Boot with Kotlin, a Kafka listener would look like this
@KafkaListener(
topics = ["\${kafka.consumer.legacyInstructorsTopic}"],
containerFactory = KafkaConfiguration.LEGACY_INSTRUCTORS_CONTAINER_FACTORY
)
fun handle(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) key: Key,
@Payload instructorAggregate: InstructorAggregate
) {
log.info("Received old instructor [${instructorAggregate}]")
instructorService.upsert(instructorAggregate)
}
For the AVRO deserialization, I got the generated AVRO schemas from the schema registry here and saved the .avsc
files. And with the help of this plugin id("com.github.davidmc24.gradle.plugin.avro")
configured like this
tasks.withType<com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask> {
source(file("${projectDir}\\src\\main\\resources\\avro"))
setOutputDir(file("${projectDir}\\src\\main\\kotlin"))
}
I obtained the POJOs, used in the listener.
Conclusion
We’ve seen how a connector can be configured to work in batches and track both new and updated records in the most robust way. You can check all the mentioned codes and files here.
That’s all folks, I hope that you’ve learned something. Happy coding!
Opinions expressed by DZone contributors are their own.
Comments