Using DuckDB With CockroachDB
Explore this fun experiment using DuckDB to parse CockroachDB Change Data Capture output and query CockroachDB with DuckDB.
Join the DZone community and get the full member experience.
Join For FreeMotivation
CockroachDB has native support for change data capture. It supports object storage sinks across all major cloud providers. At the time of writing, there are a couple of supported formats available like Avro and Newline Delimited JSON. Up until now, I've been avoiding Newline Delimited JSON because I don't find it easy to use. Today, I'd like to look at DuckDB as a viable tool to parse the CDC-generated output in newline-delimited format.
High-Level Steps
- Start a CockroachDB cluster
- Parse CockroachDB newly-delimited changefeed output using DuckDB
- Query CockroachDB tables using DuckDB
- Conclusion
Step-By-Step Instructions
Start a CockroachDB Cluster
I am using a serverless instance of CockroachDB. It has enterprise change feeds enabled by default. You can sign up for a free instance.
Parse CockroachDB Newly-Delimited Changefeed Ouptut Using DuckDB
We're going to follow the example to send sample data to an S3 bucket. DuckDB supports reading from S3 directly but today I'm going to download files to my machine and parse them locally.
I'm using the tpcc
workload to generate changefeed data but you can use the example in the doc above.
Initialize:
cockroach workload init tpcc \ --warehouses 100 $DATABASE_URL
Execute the workload:
cockroach workload run tpcc \ --duration=120m \ --concurrency=3 \ --max-rate=1000 \ --tolerate-errors \ --warehouses=10 \ --conns 60 \ --ramp=1m \ --workers=100 \ $DATABASE_URL
Create a changefeed job:
CREATE CHANGEFEED FOR TABLE history INTO 's3://artemawsbucket/tpcc/history?AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>&AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>' with updated;
Then, navigate to your S3 bucket and find the files there.
Copy data from S3 to your filesystem.
aws s3 cp s3://artemawsbucket/tpcc/history . --recursive
Install duckdb
:
brew install duckdb
Finally, navigate to the directory with the JSON files and start duckdb
.
duckdb
Looking at the available JSON functions, the standard JSON function works.
SELECT * FROM read_json_objects('202305161404194891609990000000000-fb5d1ff7b5a47331-2-15-00000000-history-a.ndjson');
│ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2404, "h_c_w_id": 1, "h_d_id": 8, "h_data": "9v3L5bOacQHehuVoJHJ2vp… │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2431, "h_c_w_id": 1, "h_d_id": 8, "h_data": "ljve8BmeEvbQ5dJWLgvcp"… │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2382, "h_c_w_id": 1, "h_d_id": 8, "h_data": "ve8BmeEvbQ5dJWLgvcp", … │
Similarly, there's a newline-delimited function read_ndjson_objects
. This time we're going to use globbing instead of individual files. We're also going to limit the output as my entire dataset is 3 million rows.
SELECT * FROM read_ndjson_objects('*.ndjson') LIMIT 5;
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ json │ │ json │ ├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1166, "h_c_w_id": 25, "h_d_id": 10, "h_data": "Z5x9v3L5bOacQHehuVo… │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1181, "h_c_w_id": 25, "h_d_id": 10, "h_data": "3L5bOacQHehuVoJHJ2v… │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1171, "h_c_w_id": 25, "h_d_id": 10, "h_data": "L5bOacQHehuVoJHJ2vp… │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1188, "h_c_w_id": 25, "h_d_id": 10, "h_data": "cQHehuVoJHJ2vp", "h… │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1184, "h_c_w_id": 25, "h_d_id": 10, "h_data": "VzccrxcAzZ5x9v3L5b"… │ └──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
We can create a DuckDB table out of the JSON files.
CREATE TABLE history AS SELECT * FROM read_ndjson_objects('*.ndjson');
show tables;
┌─────────┐ │ name │ │ varchar │ ├─────────┤ │ history │ └─────────┘
select json as col from history limit 5;
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ col │ │ json │ ├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1166, "h_c_w_id": 25, "h_d_id": 10, "h_data": "Z5x9v3L5bOacQHehuVo… │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1181, "h_c_w_id": 25, "h_d_id": 10, "h_data": "3L5bOacQHehuVoJHJ2v… │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1171, "h_c_w_id": 25, "h_d_id": 10, "h_data": "L5bOacQHehuVoJHJ2vp… │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1188, "h_c_w_id": 25, "h_d_id": 10, "h_data": "cQHehuVoJHJ2vp", "h… │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1184, "h_c_w_id": 25, "h_d_id": 10, "h_data": "VzccrxcAzZ5x9v3L5b"… │ └──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
We can query the individual columns.
select json->'after'->'h_amount' from history limit 1; ┌─────────────────────────────────┐ │ "json" -> 'after' -> 'h_amount' │ │ json │ ├─────────────────────────────────┤ │ 10.0 │ └─────────────────────────────────┘
We can cast, too.
select json->'after'->'h_data', cast (json->'after'->'h_c_id' as integer) as c_id from history where c_id > 2000 limit 5; ┌───────────────────────────────┬───────┐ │ "json" -> 'after' -> 'h_data' │ c_id │ │ json │ int32 │ ├───────────────────────────────┼───────┤ │ "7xrljve8BmeEvbQ5dJW" │ 2002 │ │ "AzZ5x9v3L5bOac" │ 2001 │ │ "x9v3L5bOacQHehuVoJ" │ 2024 │ │ "2vp7xrljve8Bme" │ 2006 │ │ "UtEdpJzCGyo91sT" │ 2029 │ └───────────────────────────────┴───────┘
We can use ->>
notation to output values as varchar instead of JSON.
SELECT distinct(cast (json->>'after'->>'h_amount' as float)) FROM history LIMIT 5; ┌──────────────────────────────────────────────────────┐ │ CAST((("json" ->> 'after') ->> 'h_amount') AS FLOAT) │ │ float │ ├──────────────────────────────────────────────────────┤ │ 10.0 │ │ 2612.12 │ │ 3986.51 │ │ 2836.18 │ │ 359.5 │ └──────────────────────────────────────────────────────┘
Another useful JSON function is read_json_auto
. It handles column types implicitly.
SELECT * FROM read_json_auto('*.ndjson'); ┌──────────────────────────────────────────────┬──────────────────────────────────────────────┬────────────────────────────────┐ │ after │ key │ updated │ │ struct(h_amount double, h_c_d_id ubigint, … │ json[] │ varchar │ ├──────────────────────────────────────────────┼──────────────────────────────────────────────┼────────────────────────────────┤ │ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id… │ [25, "42674618-a16f-4000-8000-0000000bdfb5"] │ 1684245859489160999.0000000000 │ │ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id… │ [25, "426799fb-7793-4c00-8000-0000000bdfc4"] │ 1684245859489160999.0000000000 │ │ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id… │ [25, "4267620e-e8d1-4000-8000-0000000bdfba"] │ 1684245859489160999.0000000000 │ │ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id… │ [25, "4267c121-0eb5-4800-8000-0000000bdfcb"] │ 1684245859489160999.0000000000 │ │ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id… │ [25, "4267aac2-6f34-4400-8000-0000000bdfc7"] │ 1684245859489160999.0000000000 │
We can drill down to the individual array index level.
SELECT CAST (key->0 AS INTEGER) AS hkey FROM read_json_auto('*.ndjson') WHERE hkey = 25 LIMIT 5; ┌───────┐ │ hkey │ │ int32 │ ├───────┤ │ 25 │ │ 25 │ │ 25 │ │ 25 │ │ 25 │ └───────┘
This has a lot of promise and I will look closely as DuckDB grows in popularity. It will definitely help in analyzing the CDC output.
Query CockroachDB Tables Using DuckDB
DuckDB supports querying PostgreSQL directly using the PostgreSQL extension, and today I'd like to see if we can do the same by accessing CockroachDB.
duckdb
INSTALL postgres_scanner; LOAD postgres_scanner; CREATE SCHEMA abc; CALL postgres_attach('dbname=defaultdb user=artem host=hostname port=26257 password=password' sslmode=verify-full sslrootcert=certlocation, source_schema='public' , sink_schema='abc');
┌─────────┐ │ Success │ │ boolean │ ├─────────┤ │ 0 rows │ └─────────┘
SELECT table_schema,table_name,table_type FROM information_schema.tables;
┌──────────────┬──────────────────┬────────────┐ │ table_schema │ table_name │ table_type │ │ varchar │ varchar │ varchar │ ├──────────────┼──────────────────┼────────────┤ │ abc │ pgbench_tellers │ VIEW │ │ abc │ pgbench_history │ VIEW │ │ abc │ pgbench_branches │ VIEW │ │ abc │ pgbench_accounts │ VIEW │ │ abc │ example │ VIEW │ └──────────────┴──────────────────┴────────────┘
PRAGMA show_tables;
┌──────────────────┐ │ name │ │ varchar │ ├──────────────────┤ │ example │ │ pgbench_accounts │ │ pgbench_branches │ │ pgbench_history │ │ pgbench_tellers │ └──────────────────┘
Query the tables directly, and make sure to specify the abc
schema.
SELECT * FROM abc.pgbench_history LIMIT 5;
Error: Invalid Error: IO Error: Unable to query Postgres: ERROR: at or near "(": syntax error DETAIL: source SQL: COPY (SELECT "tid", "bid", "tbalance", "filler" FROM "public"."pgbench_tellers" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid ) TO STDOUT (FORMAT binary) ^ ERROR: at or near "(": syntax error DETAIL: source SQL: COPY (SELECT "tid", "bid", "tbalance", "filler" FROM "public"."pgbench_tellers" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid ) TO STDOUT (FORMAT binary)
This is where it starts to break. The problem stems from DuckDB needing to return the result with FORMAT binary
. In CockroachDB 23.1, COPY command works with text
and csv
format only. I've filed issues 1, 2, and 3 to add support for binary
, json
and parquet
.
demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT csv); 1 2 3 4 5 demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT text); 1 2 3 4 5 demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT binary); ERROR: unimplemented: binary format for COPY TO not implemented SQLSTATE: 0A000 HINT: You have attempted to use a feature that is not yet implemented. See: https://go.crdb.dev/issue-v/97180/v23.1 demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT json); invalid syntax: statement ignored: at or near "json": syntax error: unimplemented: this syntax SQLSTATE: 0A000 DETAIL: source SQL: COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT json) ^ HINT: You have attempted to use a feature that is not yet implemented. See: https://go.crdb.dev/issue-v/96590/v23.1 demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT parquet); invalid syntax: statement ignored: at or near "parquet": syntax error: unimplemented: this syntax SQLSTATE: 0A000 DETAIL: source SQL: COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT parquet) ^ HINT: You have attempted to use a feature that is not yet implemented. See: https://go.crdb.dev/issue-v/96590/v23.1
Unfortunately, the postgres_scanner does not work with text
or csv
, or at least I haven't found a way.
D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT csv); Error: Invalid Error: IO Error: Unable to query Postgres: SSL SYSCALL error: EOF detected SSL SYSCALL error: EOF detected D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT text); Error: Catalog Error: Copy Function with name text does not exist! Did you mean "parquet"? D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT parquet); Error: Invalid Error: IO Error: Unable to query Postgres: SSL SYSCALL error: EOF detected SSL SYSCALL error: EOF detected
Conclusion
Your mileage will vary. This was a fun experiment and I will be paying close attention as this project matures.
Published at DZone with permission of Artem Ervits. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments