Clean Up Your Outbox Tables With Programmatic TTL
The goal of this post is to show how you can programmatically remove records from an Outbox table that have been flushed to its sink (e.g., Kafka).
Join the DZone community and get the full member experience.
Join For FreeFor those familiar with the Outbox Pattern, CockroachDB provides some unique capabilities for handling these types of architectural patterns. One common method is to use Changefeeds in CockroachDB to send an acknowledgment message back to the originating service that the database transaction was committed. Changefeeds are great in this scenario in that they can be emitted on a record mutation on the table (except Import), connect to a message bus like Kafka, and emit the payload in a mildly low latent (~100ms) fashion. However, one circumstance of this pattern is having historical records build up in the Outbox table. Fortunately, we have a rather nifty solution that can clean up these Outbox tables.
So the goal in this post is to show how you can programmatically remove records from an Outbox table that have been flushed to its sink (i.e Kafka). The idea here is to create a clean-up job that removes records where the MVCC timestamp of an Outbox record is adequately past the high watermark of a Changefeed.
If you want to cut the chase, proceed directly to Step 3 to find the TTL statement to execute. Steps 1 and 2 show how you can find the details in the CockroachDB catalog.
Lastly, the steps below use S3 as a sink instead of Kafka but it can easily be repurposed to use Kafka by changing the sink target in the Changefeed.
High-Level Steps
- Get the list of outbox tables that need TTL
- Find the active
Changefeed
for those tables - For each table, delete rows where the mvcc_internal timestamp of the row is < the high watermark of the
Changefeed
You can run this test using CockroachCloud or CockroachDB. If using CockroachDB, I would recommend trying this using cockroach demo.
cockroach demo --nodes 3 --empty
Step 0 - Create a Schema To Test With
For this schema, we'll create 1 decoy table and 3 outbox tables. We'll also insert some initial records, create a Changefeed on only 2 outbox tables and then add more data after the Changefeeds are created. We want to show how the SQL TTL script only picks up Outbox tables with Changefeeds. Notice how one Changefeed includes two tables: outbox_t2 and outbox_t3. We'll make sure to pick up both of the tables.
set cluster setting kv.rangefeed.enabled = true;
create table test_t0 (i int primary key);
create table outbox_t1 (i int primary key);
create table outbox_t2 (i int primary key);
create table outbox_t3 (i int primary key);
insert into test_t0 values (unique_rowid());
insert into outbox_t1 values (unique_rowid());
insert into outbox_t2 values (unique_rowid());
insert into outbox_t3 values (unique_rowid());
CREATE CHANGEFEED FOR TABLE test_t0 INTO 'experimental-s3://chrisc-test/changefeed/ttl/test?AUTH=implicit' WITH updated, resolved = '1m';
CREATE CHANGEFEED FOR TABLE outbox_t1 INTO 'experimental-s3://chrisc-test/changefeed/ttl/outbox1?AUTH=implicit' WITH updated, resolved = '1m';
CREATE CHANGEFEED FOR TABLE outbox_t2, outbox_t3 INTO 'experimental-s3://chrisc-test/changefeed/ttl/outbox2?AUTH=implicit' WITH updated, resolved = '1m';
insert into test_t0 values (unique_rowid());
insert into outbox_t1 values (unique_rowid());
insert into outbox_t2 values (unique_rowid());
insert into outbox_t3 values (unique_rowid());
Let's also verify the Changefeeds are sending data to our sink:
aws s3 ls s3://chrisc-test/changefeed/ttl/ --recursive | grep ndjson
The output should look like this:
2021-08-19 21:46:33 222 changefeed/ttl/outbox1/2021-08-20/202108200145322858970000000000000-fad43b3554ca0a0b-1-5-00000000-outbox_t1-1.ndjson
2021-08-19 22:05:35 180 changefeed/ttl/outbox1/2021-08-20/202108200204308217750000000000001-fad43b3554ca0a0b-1-5-00000001-outbox_t1-1.ndjson
2021-08-19 21:46:33 222 changefeed/ttl/outbox2/2021-08-20/202108200145325129600000000000000-212aeaca8652f3bc-1-8-00000000-outbox_t2-1.ndjson
2021-08-19 21:46:33 222 changefeed/ttl/outbox2/2021-08-20/202108200145325129600000000000000-212aeaca8652f3bc-1-8-00000001-outbox_t3-1.ndjson
2021-08-19 22:05:35 180 changefeed/ttl/outbox2/2021-08-20/202108200204310221340000000000001-212aeaca8652f3bc-1-8-00000002-outbox_t2-1.ndjson
2021-08-19 22:05:35 180 changefeed/ttl/outbox2/2021-08-20/202108200204310221340000000000001-212aeaca8652f3bc-1-8-00000003-outbox_t3-1.ndjson
2021-08-19 21:46:33 222 changefeed/ttl/test/2021-08-20/202108200145319247430000000000000-64e78d6565e58782-1-2-00000000-test_t0-1.ndjson
Step 1 - Get the List of Outbox Tables That Need TTL
This is a simple selection that queries the internal catalog of CockroachDB to find tables that have an 'outbox' prefix.
select table_catalog, table_name
from information_schema.tables
where table_name like 'outbox%';
And the output should be:
table_catalog | table_name
----------------+-------------
defaultdb | outbox_t1
defaultdb | outbox_t2
defaultdb | outbox_t3
(3 rows)
Pretty simple.
Step 2 - Find the Active Changefeeds for the Outbox Tables
Again, let's query CockroachDB's internal catalog to see which tables that are prefixed with 'outbox' also have a running Changefeed.
select
j.job_id,
n."parentID",
n2.name as "database",
j.id,
n.name as "table",
j.high_water_timestamp
from system.namespace n
inner join
(
select job_id, unnest(descriptor_ids) as id, high_water_timestamp
from crdb_internal.jobs
where "job_type" = 'CHANGEFEED'
and "status" = 'running'
) j
on j.id = n.id
inner join
system.namespace n2
on n."parentID" = n2.id
where n."parentID" != 0
and n.name like 'outbox%'
;
The output should be something like this:
job_id | parentID | database | id | table | high_water_timestamp
---------------------+----------+-----------+----+-----------+---------------------------------
686009654600433665 | 50 | defaultdb | 53 | outbox_t1 | 1629424169484287000.0000000000
686009655343546369 | 50 | defaultdb | 55 | outbox_t3 | 1629424169684500000.0000000000
686009655343546369 | 50 | defaultdb | 54 | outbox_t2 | 1629424169684500000.0000000000
(3 rows)
Time: 10ms total (execution 10ms / network 0ms)
Step 3 - Create Those TTL-ish Delete Statements
And this is really the only statement you need to run. This will create the delete statements of records to delete in the outbox table. The records that will be deleted will compare the high water timestamp of the change feed to the mvcc timestamp of the record. The high water timestamp of the Changefeed is the checkpoint that indicates records that have been sent to their sink up to a particular timestamp. This is one of my favorite queries...
select
'delete from ' || n2.name || '.' || n.name || ' where crdb_internal_mvcc_timestamp < ' || j.high_water_timestamp::STRING || ';' as "SQL"
from system.namespace n
inner join
(
select job_id, unnest(descriptor_ids) as id, high_water_timestamp
from crdb_internal.jobs
where "job_type" = 'CHANGEFEED'
and "status" = 'running'
) j
on j.id = n.id
inner join
system.namespace n2
on n."parentID" = n2.id
where n."parentID" != 0
and n.name like 'outbox%'
;
The output here will create the delete statements for you to run:
SQL
-------------------------------------------------------------------------------------------------------
delete from defaultdb.outbox_t1 where crdb_internal_mvcc_timestamp < 1629424830259619000.0000000000
delete from defaultdb.outbox_t3 where crdb_internal_mvcc_timestamp < 1629424830459942000.0000000000
delete from defaultdb.outbox_t2 where crdb_internal_mvcc_timestamp < 1629424830459942000.0000000000
(3 rows)
Step 4 - Run the Delete Statements
This is the last step where we remove the records that have already been emitted to our sink.
> delete from defaultdb.outbox_t1 where crdb_internal_mvcc_timestamp < 1629425070821775000.0000000000;
delete from defaultdb.outbox_t3 where crdb_internal_mvcc_timestamp < 1629425071022134000.0000000000;
delete from defaultdb.outbox_t2 where crdb_internal_mvcc_timestamp < 1629425071022134000.0000000000;
DELETE 2
Time: 4ms total (execution 4ms / network 0ms)
DELETE 2
Time: 2ms total (execution 2ms / network 0ms)
DELETE 2
Time: 3ms total (execution 3ms / network 0ms)
Step 5 - Clean Up
Lastly, let's clean up our files that we placed in S3.
aws s3 rm s3://chrisc-test/changefeed/ttl/ --recursive
Conclusion
You can easily create a program to use the query in Step 3, execute it, then take the SQL output and execute the generated statements. Below is a quick and dirty example of how you can do this in Python.
import psycopg2
conn = psycopg2.connect(database="defaultdb", user="demo", password="demo61304" ,host="localhost", port=26257)
conn.set_session(autocommit=True)
cur = conn.cursor()
sql=""" select 'delete from ' || n2.name || '.' || n.name || ' where crdb_internal_mvcc_timestamp < ' || j.high_water_timestamp::STRING || ';' as "SQL" from system.namespace n inner join ( select job_id, unnest(descriptor_ids) as id, high_water_timestamp from crdb_internal.jobs where "job_type" = 'CHANGEFEED' and "status" = 'running' ) j on j.id = n.id inner join system.namespace n2 on n."parentID" = n2.id where n."parentID" != 0 and n.name like 'outbox%'; """
cur.execute(sql)
ttl=cur.fetchall()
ttlsql='; '.join(map(str,ttl)).replace("'","").replace("(","").replace(";,)","")
cur.execute(ttlsql)
cur.statusmessage
cur.close()
conn.close()
I hope you enjoyed this creative way of removing historical records from your Outbox tables!
Published at DZone with permission of Chris Casano. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments