Debezium Server With PostgreSQL and Redis Stream
Stream your PostgreSQL Table changes.
Join the DZone community and get the full member experience.
Join For FreeDebezium is a great tool for capturing the row-level changes that happen on a Database and streaming those changes to a broker of our choice.
Since this functionality stays within the boundaries of a Database, it helps on keeping our applications simple. For example, there is no need for an application to emit events on any database interactions. Debezium will monitor the row changes and will cast the events. Based on the broker solution used with Debezium a consumer can subscribe to the broker and thus receive the changes.
PostgreSQL is a popular SQL database is supported by Debezium.
Our goal would be to listen to PostgreSQL changes and stream them to a Redis stream through a Debezium Server. It is common to use Debizum with Kafka, in cases where Kafka is not present in a team’s Tech stack we can use other brokers.
In our case, we would keep things lightweight by using Redis Streams.
Redis will be set up without any extra configurations.
In order to use PostgreSQL with Debezium, it is essential to alter the configuration of PostgreSQL.
The configuration we shall use on PostgreSQL will be the following
listen_addresses = '*'
port = 5432
max_connections = 20
shared_buffers = 128MB
temp_buffers = 8MB
work_mem = 4MB
wal_level = logical
max_wal_senders = 3
As we can see we use the logical_decoding from PostgreSQL.
From the documentation:
Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.
In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.
We will also create a namespace and a table for PostgreSQL. The namespace and the table will be the ones to listen to for changes.
#!/bin/bash
set -e
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
create schema test_schema;
create table test_schema.employee(
id SERIAL PRIMARY KEY,
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
email TEXT not null,
age INT NOT NULL,
salary real,
unique(email)
);
EOSQL
Debezium will have to be able to interact with the PostgreSQL server as well as the Redis server.
The configuration should be the following.
debezium.sink.type=redis
debezium.sink.redis.address=redis:6379
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.whitelist=test_schema
debezium.source.plugin.name=pgoutput
By examining the configuration we can see that we have the necessary information for Debezium to communicate to the PostgreSQL database, also we specify the schema we created previously. Therefore only changes from that schema will be streamed. We can also make things more restrictive for example whitelisting tables.
Since this demo will involve three different software Components docker-compose will come in handy.
version: '3.1'
services:
redis:
image: redis
ports:
- 6379:6379
depends_on:
- postgres
postgres:
image: postgres
restart: always
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
volumes:
- ./postgresql.conf:/etc/postgresql/postgresql.conf
- ./init:/docker-entrypoint-initdb.d
command:
- "-c"
- "config_file=/etc/postgresql/postgresql.conf"
ports:
- 5432:5432
debezium:
image: debezium/server
volumes:
- ./conf:/debezium/conf
- ./data:/debezium/data
depends_on:
- redis
Using Compose, we could spin up three different software components on the same network. This helps the components to interact with each other by using the DNS names of the services as specified on Compose. Also, the configuration files we created previously are mounted to the Docker containers. Docker Compose V2 is out there with many good features, you can find more about it in the book I authored - A Developer’s Essential Guide to Docker Compose.
In order to get the stack running, we shall execute the following command
$ docker compose up
Since it is up and running, we can now start listening to events.
We shall log in to Redis and start listening for any possible database updates.
$ docker exec -it debezium-example-redis-1 redis-cli
> xread block 1000000 streams tutorial.test_schema.employee $
This will make blocking possible until we receive one event from the stream.
If we examine the stream name we should see the pattern of {server-name}.{schema}.{table}
. This would allow consumers to subscribe only to the changes of interest.
Onwards we will make an entry.
$ docker exec -it debezium-example-postgres-1 psql postgres postgres
> insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','john1@doe.com',18,1234.23);
> \q
If we check the Redis session we should see that we received an event from the Redis stream
127.0.0.1:6379> xread block 1000000 streams tutorial.test_schema.employee $
1) 1) "tutorial.test_schema.employee"
2) 1) 1) "1663796657336-0"
2) 1) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Key\"},\"payload\":{\"id\":1}}"
2) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":1,\"firstname\":\"John\",\"lastname\":\"Doe 1\",\"email\":\"john1@doe.com\",\"age\":18,\"salary\":1234.23},\"source\":{\"version\":\"1.9.5.Final\",\"connector\":\"postgresql\",\"name\":\"tutorial\",\"ts_ms\":1663796656393,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\"24289128\\\"]\",\"schema\":\"test_schema\",\"table\":\"employee\",\"txId\":738,\"lsn\":24289128,\"xmin\":null},\"op\":\"c\",\"ts_ms\":1663796657106,\"transaction\":null}}"
(10.17s)
127.0.0.1:6379>
How cool is that? We can now start streaming our database changes to the broker of our choice.
You can find the source code on GitHub.
Published at DZone with permission of Emmanouil Gkatziouras, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments