Emitting Protocol Buffers Using CockroachDB CDC Queries
Follow this demonstration using several recent features to demonstrate the ability to serialize CockroachDB rows to proto and emit via CDC Queries.
Join the DZone community and get the full member experience.
Join For FreePrevious Articles on CockroachDB CDC
- Using CockroachDB CDC with Apache Pulsar
- Using CockroachDB CDC with Azure Event Hubs
- Using CockroachDB CDC with Confluent Cloud Kafka and Schema Registry
- SaaS Galore: Integrating CockroachDB With Confluent Kafka, Fivetran, and Snowflake
- CockroachDB CDC Using Minio as Cloud Storage Sink - Part 3
- CockroachDB CDC using Hadoop Ozone S3 Gateway as cloud storage sink
Motivation
Protocol Buffers are language-neutral, platform-neutral extensible mechanisms for serializing structured data. It's a common choice for platforms needing to pass messages between systems. CockroachDB is a distributed SQL database built on a transactional and strongly-consistent key-value store. It scales horizontally; survives disk, machine, rack, and even datacenter failures with minimal latency disruption and no manual intervention; supports strongly consistent ACID transactions; and provides a familiar SQL API for structuring, manipulating, and querying data.
There is no official support for Protocol Buffers in CockroachDB Changefeeds, even though we use Protocol Buffers extensively in the code. A recent customer conversation led to this experiment where I'm going to use several recent features to demonstrate the ability to serialize CockroachDB rows to proto and emit via CDC Queries. This is the first time we're looking at CDC Queries. This is a new flexible way to express CockroachDB streams.
This tutorial assumes you have an enterprise license. Given the features in this tutorial are unavailable as a product, you have to follow the steps exactly as described to pull the right source code to make it work. These features are not available in any of the available offerings from Cockroach Labs.
High-Level Steps
- Build CockroachDB with the Protocol Buffers function
- Deploy a CockroachDB cluster with enterprise changefeeds
- Deploy a Kafka Consumer
- Verify
- Conclusion
Step-by-Step Instructions
Build CockroachDB With the Protocol Buffers Function
Before I show you how to get this working, I'd like to express my gratitude to Yevgeniy Miretskiy, who works on the CDC team for the capability and his mentorship to get this working. The source code for the feature is available in the following commit. For brevity, I will skip the steps to setup a build environment.
Check out the pull request:
gh pr checkout 89955
Run the preliminary steps to build Cockroach from the source.
./dev doctor
Finally, build the code.
bazel build pkg/cmd/cockroach-short
Navigate to the directory with the built package.
cd _bazel/bin/pkg/cmd/cockroach
Deploy a CockroachDB Cluster With Enterprise Changefeeds
Start an instance of CockroachDB using the built package:
./cockroach start-single-node --insecure --background
To enable CDC we need to execute the following commands:
SET CLUSTER SETTING cluster.organization = '<organization name>'; SET CLUSTER SETTING enterprise.license = '<secret>'; SET CLUSTER SETTING kv.rangefeed.enabled = true;
Generate sample data:
CREATE TABLE office_dogs ( id INT PRIMARY KEY, name STRING); INSERT INTO office_dogs VALUES (1, 'Petee'), (2, 'Carl'); UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
SELECT * FROM office_dogs;
id | name -----+---------- 1 | Petee H 2 | Carl
The function we are going to use to convert rows to Protocol Buffers is crdb_internal.row_to_proto()
. With the given pull request, this function is readily available for querying.
SELECT crdb_internal.row_to_proto(office_dogs) FROM office_dogs;
crdb_internal.row_to_proto ------------------------------------------------------------------------------ \x0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f \x0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040
It takes the row and serializes it as proto. We can decode the row back to human readable form using the following query, skip the \x
, and copy the rest of the output into the following function:
SELECT crdb_internal.pb_to_json('google.protobuf.Struct', decode('0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f', 'hex')) AS proto;
proto -------------------------------- {"id": 1, "name": "Petee H"}
SELECT crdb_internal.pb_to_json('google.protobuf.Struct', decode('0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040', 'hex')) AS proto;
proto ----------------------------- {"id": 2, "name": "Carl"}
We can use this function in the CDC query, but first, let's set up a webhook sink for a quick demonstration of changefeed queries.
git clone https://github.com/cockroachlabs/cdc-webhook-sink-test-server.git cd cdc-webhook-sink-test-server cd go-https-server chmod +x server.sh ./server.sh
./server.sh ......+.........+.....+....+...+.....+...+.+.....+++++++++++++++++++++++++++++++++++++++++++++*.+...........+.+......+.....+...+....+...+......+...........+...+......+++++++++++++++++++++++++++++++++++++++++++++*......+.....+.............+...+..+.+.....................+......+..+.+++++ ----- 2023/08/30 09:39:05 starting server on port 3000
With all of the basics in place, we can create a changefeed.
CREATE CHANGEFEED INTO 'webhook-https://localhost:3000?insecure_tls_skip_verify=true' WITH updated AS SELECT crdb_internal.row_to_proto(office_dogs) AS proto FROM office_dogs;
job_id ---------------------- 895654238351589377
Verify
Looking at the terminal where the webhook sink is running:
2023/08/30 11:11:25 {"payload":[{"__crdb__": {"updated": "1693408285757033000.0000000000"}, "proto": "\\x0a0e0a046e616d6512061a044361726c0a0f0a0269641209110000000000000040"}],"length":1} 2023/08/30 11:11:25 {"payload":[{"__crdb__": {"updated": "1693408285757033000.0000000000"}, "proto": "\\x0a110a046e616d6512091a07506574656520480a0f0a026964120911000000000000f03f"}],"length":1}
Let's update a record in the office_dogs
TABLE:
UPDATE office_dogs SET name = 'Tarzan' WHERE id = 1;
2023/08/30 11:12:58 {"payload":[{"__crdb__": {"updated": "1693408377084928000.0000000000"}, "proto": "\\x0a100a046e616d6512081a065461727a616e0a0f0a026964120911000000000000f03f"}],"length":1}
If we use the decode function to inspect the payload:
SELECT crdb_internal.pb_to_json('google.protobuf.Struct', decode('0a100a046e616d6512081a065461727a616e0a0f0a026964120911000000000000f03f', 'hex')) AS proto;
proto ------------------------------- {"id": 1, "name": "Tarzan"}
I have to mention that the emitted messages are of dynamically typed format and not strongly typed. If your use case requires strongly typed, it's a conversation we have to have another time.
Conclusion
This is how you can leverage CockroachDB CDC Queries with built-in functions. This function is not available, but it can be, given higher demand. Hopefully, you've found this article useful. Please reach out to our tech if you need this capability and we will consider it in the future.
Published at DZone with permission of Artem Ervits. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments