Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
In this article, see a quick walkthrough on how to use the PostgreSQL pgoutput plugin for change data capture with Debezium.
Join the DZone community and get the full member experience.
Join For FreeChange Data Capture Architecture Using Debezium, Postgres, and Kafka
was a tutorial on how to use Debezium for change data capture from Azure PostgreSQL and send them to Azure Event Hubs for Kafka - it used the wal2json
output plugin.
What About the pgoutput
Plugin?
This blog will provide a quick walk through of how to pgoutput
plugin. I will not be repeating a lot of details and use containerized versions (using Docker Compose) of Kafka connect, Kafka (and Zookeeper) to keep things simple. So, the only thing you need is Azure PostgreSQL, which you can setup using a variety of options including, the Azure Portal, Azure CLI, Azure PowerShell, ARM template.
The resources are available on GitHub - https://github.com/abhirockzz/debezium-postgres-pgoutput
Using the Right publication.autocreate.mode
With the pgoutput
plugin, it's important that you use the appropriate value for publication.autocreate.mode. If you're using all_tables (which is the default), you need to ensure that the publication is created up-front for the specific table(s) you want to configure for change data capture. If the publication is not found, the connector will try to create one using CREATE PUBLICATION <publication_name> FOR ALL TABLES;
which will fail due to lack of permissions.
The other two options work as expected:
- disabled: you need to ensure that the publication is created up-front. The connector will not attempt to create the publication if it isn't found to exist upon startup - it will throw an exception and stop.
- filtered: you can (optionally) choose to create the publication up-front. If the publication is not found, the connector will create a new publication for all those tables matching the current filter configuration.
This has been highlighted in the docs https://debezium.io/documentation/reference/1.3/connectors/postgresql.html#postgresql-on-azure
Let's Try the Different Scenarios
Before that:
git clone https://github.com/abhirockzz/debezium-postgres-pgoutput && cd debezium-postgres-pgoutput
Start Kafka, Zookeeper and Kafka Connect containers:
xxxxxxxxxx
export DEBEZIUM_VERSION=1.2
docker-compose up
It might take a while to pull the containers for the first time
Once all the containers are up and running, connect to Azure PostgreSQL, create a table and insert some data:
xxxxxxxxxx
psql -h <DBNAME>.postgres.database.azure.com -p 5432 -U <DBUSER><DBNAME> -W -d postgres --set=sslmode=require
psql -h abhishgu-pg.postgres.database.azure.com -p 5432 -U abhishgu-pg -W -d postgres --set=sslmode=require
CREATE TABLE inventory (id SERIAL, item VARCHAR(30), qty INT, PRIMARY KEY(id));
When publication.autocreate.mode is set to filtered
This works well with Azure PostgreSQL - it does not require super user permissions because the connector creates the publication for a specific table(s) based on the filter/*list values
Update the connector config file (pg-source-connector.json
) with details of your Azure PostgreSQL instance and then create the connector
To create the connector:
xxxxxxxxxx
curl -X POST -H "Content-Type: application/json" --data -source-connector.json http://localhost:8083/connectors
Notice the logs (in the docker compose terminal):
xxxxxxxxxx
Creating new publication 'mytestpub' for plugin 'PGOUTPUT' [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]
Once the connector starts, check the publications in PostgreSQL:
xxxxxxxxxx
pubname | schemaname | tablename
-----------+------------+-----------
mytestpub | public | inventory
Does it work?
Insert a couple of records in the inventory
table
xxxxxxxxxx
psql -h <DBNAME>.postgres.database.azure.com -p 5432 -U <DBUSER><DBNAME> -W -d postgres --set=sslmode=require
INSERT INTO inventory (item, qty) VALUES ('apples', '100');
INSERT INTO inventory (item, qty) VALUES ('oranges', '42');
select * from inventory;
The connector should push the change events from PostgreSQL WAL (write ahead log) to Kafka. Check the messages in the corresponding Kafka topic:
xxxxxxxxxx
//exec into the kafka docker container
docker exec -it debezium-postgres-pgoutput_kafka_1 bash
cd bin && ./kafka-console-consumer.sh --topic myserver.public.inventory --bootstrap-server kafka:9092 --from-beginning
You should see a couple of change log event payloads (corresponding to the two INSERT
s)
yes they are verbose since the schema is included in the payload
Change publication.autocreate.mode to disabled
For this mode, we need a publication created up-front. Since we already have one (mytestpub
), just use it. All you need to do is update the publication.autocreate.mode
in pg-source-connector.json
to disabled
.
Re-create the connector:
xxxxxxxxxx
//delete
curl -X DELETE localhost:8083/connectors/inventory-connector
//create
curl -X POST -H "Content-Type: application/json" --data -source-connector.json http://localhost:8083/connectors
Test it end to end using the same steps as in the previous section - everything should work just fine!
Just to confirm, update the
publication.name
in connector config to one that does not exist. The connector will fail to start due to missing publication (as expected)
Try publication.autocreate.mode = all_tables
Set publication.autocreate.mode
to all_tables
, publication.name
to one that does not exist (e.g. testpub1
) and create the connector:
xxxxxxxxxx
curl -X POST -H "Content-Type: application/json" --data -source-connector.json http://localhost:8083/connectors
(as expected) It will fail with an error similar to this:
xxxxxxxxxx
....
INFO Creating new publication 'testpub1' for plugin 'PGOUTPUT' (io.debezium.connector.postgresql.connection.PostgresReplicationConnection:127)
ERROR WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
io.debezium.jdbc.JdbcConnectionException: ERROR: must be superuser to create FOR ALL TABLES publication
....
Notice the part must be superuser to create FOR ALL TABLES publication
- as previously mentioned, CREATE PUBLICATION <publication_name> FOR ALL TABLES;
failed due to lack of superuser permissions.
As I mentioned earlier, you need to work around this by creating the publication manually for specific tables only
Clean Up
To clean up, delete the Azure PostgreSQL instance using az postgres server delete and remove the containers
xxxxxxxxxx
az postgres server delete -g <resource group> -n <server name>
docker-compose down -v
That's it for this short blog post. Stay tuned for more!
Opinions expressed by DZone contributors are their own.
Comments