Handling Real-Time Updates in ClickHouse
In this article, see how to handle real-time updates in ClickHouse by looking at a use case.
Join the DZone community and get the full member experience.
Join For FreeEven now, ClickHouse updates are asynchronous, which makes them difficult to use in interactive applications. Still, in many use cases users need to apply modifications to existing data and expect to see the effect immediately. Can ClickHouse do that? Sure it can.
A Short History of ClickHouse Updates
Back in 2016, the ClickHouse team published an article titled “How To Update Data in ClickHouse.” ClickHouse did not support data modifications at that time. Only special insert structures could be used in order to emulate updates, and data had to be dropped by partitions.
Under the pressure of GDPR, requirements the ClickHouse team delivered UPDATEs and DELETEs in 2018. The follow-up article, Updates and Deletes in ClickHouse, is still one of the most read articles in the Altinity blog. Those asynchronous, non-atomic updates are implemented as ALTER TABLE UPDATE statements, and can potentially shuffle a lot of data. This is useful for bulk operations and infrequent updates, when immediate results are not needed. “Normal” SQL updates are still missing in ClickHouse, though they reliably appear in the roadmap every year. If real-time update behavior is required, we have to use a different approach. Let’s consider a practical use case and compare different ways of doing it in ClickHouse.
Use Case
Consider a system that generates various kinds of alerts. Users or machine learning algorithms query the database from time to time to review new alerts and acknowledge them. Acknowledgment operations need to modify the alert record in the database. Once acknowledged, alerts should disappear from the users’ views. This looks like an OLTP operation that is alien to ClickHouse.
Since we cannot use updates, we will have to insert a modified record instead. Once two records are in the database, we need an efficient way to get the latest one. For that we will try three different approaches:
ReplacingMergeTree
Aggregate functions
AggregatingMergeTree
ReplacingMergeTree
Let’s start by creating a table that stores alerts.
CREATE TABLE alerts(
tenant_id UInt32,
alert_id String,
timestamp DateTime Codec(Delta, LZ4),
alert_data String,
acked UInt8 DEFAULT 0,
ack_time DateTime DEFAULT toDateTime(0),
ack_user LowCardinality(String) DEFAULT ''
)
ENGINE = ReplacingMergeTree(ack_time)
PARTITION BY tuple()
ORDER BY (tenant_id, timestamp, alert_id);
For simplicity, all alert specific columns are packaged into a generic alert_data
column. But you can imagine that alert may contain dozens or even hundreds of columns. Also, alert_id
is a random string in our example.
Note the ReplacingMergeTree
engine. ReplacingMergeTee is a special table engine that replaces data by primary key (ORDER BY) -- the newer version of the row with the same key value will replace the older one. ‘Newness’ is determined by a column, ‘ack_time’ in our case. The replacement is performed during background merge operation. It does not happen immediately and there is no guarantee it happens at all, so consistency of the query results is a concern. ClickHouse has a special syntax to work with such tables, though, and we will be using it in the queries below.
Before we run queries, let’s fill the table with some data. We generate 10M alerts for 1000 tenants:
INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
SELECT
toUInt32(rand(1)%1000+1) AS tenant_id,
randomPrintableASCII(64) as alert_id,
toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
randomPrintableASCII(1024) as alert_data
FROM numbers(10000000);
Next, let’s acknowledge 99% of alerts, providing new values for ‘acked’, ‘ack_user’ and ‘ack_time’ columns. Instead of an update, we just insert a new row.
xxxxxxxxxx
INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
SELECT tenant_id, alert_id, timestamp, alert_data,
1 as acked,
concat('user', toString(rand()%1000)) as ack_user, now() as ack_time
FROM alerts WHERE cityHash64(alert_id) % 99 != 0;
If we query this table now, we will see something like:
xxxxxxxxxx
SELECT count() FROM alerts
┌──count()─┐
│ 19898060 │
└──────────┘
1 rows in set. Elapsed: 0.008 sec.
So we definitely have both acknowledged and non-acknowledged rows in the table. So replacing does not happen yet. In order to see the ‘real’ data, we have to add a FINAL keyword.
xxxxxxxxxx
SELECT count() FROM alerts FINAL
┌──count()─┐
│ 10000000 │
└──────────┘
1 rows in set. Elapsed: 3.693 sec. Processed 19.90 million rows, 1.71 GB (5.39 million rows/s., 463.39 MB/s.)
The count is correct now, but look at the query time! With FINAL, ClickHouse has to scan all rows and merge them by primary key in query time. That produces the correct answer but with a lot of overhead. Let’s see if we can do better by filtering only rows that have not been acknowledged.
xxxxxxxxxx
SELECT count() FROM alerts FINAL WHERE NOT acked
┌─count()─┐
│ 101940 │
└─────────┘
1 rows in set. Elapsed: 3.570 sec. Processed 19.07 million rows, 1.64 GB (5.34 million rows/s., 459.38 MB/s.)
The query time and amount of data processed is the same, even though the count is much smaller. Filtering does not help to speed up the query. As the table size grows, the cost may be even more substantial. It does not scale.
Note: for the sake of readability, all queries and query times are presented as if they are running in ‘clickhouse-client’. In fact, we tried queries multiple times in order to make sure results are consistent and confirm it with the ‘clickhouse-benchmark’ utility as well.
Ok, querying the entire table is not that helpful. Can we still use ReplacingMergeTree for our use case? Let’s pick a random tenant_id, and select all records that were not acknowledged yet -- imagine there is a dashboard that the user is looking into. I like Ray Bradbury, so I picked 451. Since ‘alert_data’ is just random garbage, we will calculate a checksum, and will use it to confirm that the results are the same across multiple approaches:
xxxxxxxxxx
SELECT
count(),
sum(cityHash64(*)) AS data
FROM alerts FINAL
WHERE (tenant_id = 451) AND (NOT acked)
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.278 sec. Processed 106.50 thousand rows, 119.52 MB (383.45 thousand rows/s., 430.33 MB/s.)
That was pretty fast! In 278 ms we could query all non-acknowledged data. Why is it fast this time? The difference is in the filter condition. ‘tenant_id’ is a part of a primary key, so ClickHouse can filter data before FINAL. In this case, ReplacingMergeTree becomes efficient.
Let’s try a user filter as well and query the number of alerts acknowledged by a particular user. The cardinality of the column is the same -- we have 1000 users and can try user451.
xxxxxxxxxx
SELECT count() FROM alerts FINAL
WHERE (ack_user = 'user451') AND acked
┌─count()─┐
│ 9725 │
└─────────┘
1 rows in set. Elapsed: 4.778 sec. Processed 19.04 million rows, 1.69 GB (3.98 million rows/s., 353.21 MB/s.)
That is very slow now because the index is not used. ClickHouse scanned all 19.04 million rows. Note that we cannot add ‘ack_user’ to the index, since it will break ReplacingMergeTree semantics. We can do a trick with PREWHERE, though:
xxxxxxxxxx
SELECT count() FROM alerts FINAL
PREWHERE (ack_user = 'user451') AND acked
┌─count()─┐
│ 9725 │
└─────────┘
1 rows in set. Elapsed: 0.639 sec. Processed 19.04 million rows, 942.40 MB (29.80 million rows/s., 1.48 GB/s.)
PREWHERE is a special hint for ClickHouse to apply a filter differently. Usually ClickHouse is smart enough to move conditions to PREWHERE automatically, so a user should not care. It did not happen this time, so it’s good we’ve checked.
Aggregate Functions
ClickHouse is known for supporting a wide variety of aggregate functions. In the latest versions it has got more than 100. Combined with 9 aggregate function combinators (see https://clickhouse.tech/docs/en/query_language/agg_functions/combinators/), this gives enormous flexibility to an experienced user. For this use case, we do not need anything advanced, and will be using only 3 functions: ‘argMax’, ‘max’ and ‘any’.
The same query for the 451st tenant can be executed with an ‘argMax’ aggregate function as follows:
xxxxxxxxxx
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
argMax(alert_data, ack_time) alert_data,
argMax(acked, ack_time) acked,
max(ack_time) ack_time_,
argMax(ack_user, ack_time) ack_user
FROM alerts
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.059 sec. Processed 73.73 thousand rows, 82.74 MB (1.25 million rows/s., 1.40 GB/s.)
Same result, same number of rows, but 4 times better performance! This is ClickHouse aggregation efficiency. The downside is that the query becomes more complex. But we can make it simpler.
Let’s note, that when acknowledging an alert, we only update 3 columns:
acked: 0 => 1
ack_time: 0 => now()
ack_user: '' => 'user1'
In all 3 cases, the column value increases! So instead of the somewhat bulky ‘argMax’ we can use ‘max.’ Since we do not change ‘alert_data,’ we do not need any actual aggregation on this column. ClickHouse has a nice ‘any’ aggregate function for this purpose. It picks any value without extra overhead:
xxxxxxxxxx
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
any(alert_data) alert_data,
max(acked) acked,
max(ack_time) ack_time,
max(ack_user) ack_user
FROM alerts
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.055 sec. Processed 73.73 thousand rows, 82.74 MB (1.34 million rows/s., 1.50 GB/s.)
The query becomes simple and it is slightly faster! The reason is that with the ‘any’ function, ClickHouse does not need to calculate ‘max’ on the ‘alert_data’ column!
AggregatingMergeTree
AggregatingMergeTree is one of the most powerful ClickHouse features. When coupled with materialized views, it enables real-time data aggregation. Since we used aggregate functions in the previous approach, can we make it even better with AggregatingMergeTree? Actually, it’s not much of an improvement.
We only update a row once, so there are only two rows to aggregate for a group. For this scenario, AggregatingMergeTree is not the best option. We can do a trick, however. We know that alerts are always inserted as non-acknowledged first, and then become acknowledged. Once a user acknowledges the alert, only 3 columns need to be modified. Can we save a disk space and improve performance if we do not duplicate data for the other columns?
Let’s create a table that implements the aggregation using the ‘max’ aggregate function. Instead of ‘max’, we could also use ‘any’, but that would require columns to be Nullable -- ‘any’ would pick a not-null value.
xxxxxxxxxx
DROP TABLE alerts_amt_max;
CREATE TABLE alerts_amt_max (
tenant_id UInt32,
alert_id String,
timestamp DateTime Codec(Delta, LZ4),
alert_data SimpleAggregateFunction(max, String),
acked SimpleAggregateFunction(max, UInt8),
ack_time SimpleAggregateFunction(max, DateTime),
ack_user SimpleAggregateFunction(max, LowCardinality(String))
)
Engine = AggregatingMergeTree()
ORDER BY (tenant_id, timestamp, alert_id);
Since the original data was random, we will populate the new table using existing data from ‘alerts’. We will do it in two inserts, like before, one for non-acknowledged alerts and another one for acknowledged:
xxxxxxxxxx
INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked;
INSERT INTO alerts_amt_max
SELECT tenant_id, alert_id, timestamp,
'' as alert_data,
acked, ack_time, ack_user
FROM alerts WHERE acked;
Note that we insert an empty string instead of ‘alert_data’ for acknowledged events. We know that data does not change, and we can store it only once! The aggregate function will fill the gap. In the real application, we can just skip all columns that do not change and let them get default values.
Once we have the data, let’s check the data sizes first:
xxxxxxxxxx
SELECT
table,
sum(rows) AS r,
sum(data_compressed_bytes) AS c,
sum(data_uncompressed_bytes) AS uc,
uc / c AS ratio
FROM system.parts
WHERE active AND (database = 'last_state')
GROUP BY table
┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬──────────────ratio─┐
│ alerts │ 19039439 │ 20926009562 │ 21049307710 │ 1.0058921003373666 │
│ alerts_amt_max │ 19039439 │ 10723636061 │ 10902048178 │ 1.0166372782501314 │
└────────────────┴──────────┴─────────────┴─────────────┴────────────────────┘
Well, we have almost no compression, thanks to random strings. But aggregate is two times smaller, since we do not have to store ‘alerts_data’ twice.
Now let’s try the query over the aggregate table:
xxxxxxxxxx
SELECT count(), sum(cityHash64(*)) data FROM (
SELECT tenant_id, alert_id, timestamp,
max(alert_data) alert_data,
max(acked) acked,
max(ack_time) ack_time,
max(ack_user) ack_user
FROM alerts_amt_max
GROUP BY tenant_id, alert_id, timestamp
)
WHERE tenant_id=451 AND NOT acked;
┌─count()─┬─────────────────data─┐
│ 90 │ 18441617166277032220 │
└─────────┴──────────────────────┘
1 rows in set. Elapsed: 0.036 sec. Processed 73.73 thousand rows, 40.75 MB (2.04 million rows/s., 1.13 GB/s.)
Thanks to AggregatingMergeTree, we process less data (40MB vs 82MB before) and it is now even more efficient.
Materializing The Update
ClickHouse will do its best to merge data in the background, removing duplicate rows and performing aggregation. Sometimes, however, it makes sense to force the merge, in order to release disk space, for example. This can be done with the OPTIMIZE FINAL statement. OPTIMIZE is a blocking and expensive operation, therefore it cannot be performed too often. Let’s see if it makes any difference for the query performance.
xxxxxxxxxx
OPTIMIZE TABLE alerts FINAL
Ok.
0 rows in set. Elapsed: 105.675 sec.
OPTIMIZE TABLE alerts_amt_max FINAL
Ok.
0 rows in set. Elapsed: 70.121 sec.
After OPTIMIZE FINAL, both tables have the same number of rows and identical data.
xxxxxxxxxx
┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬────────────ratio─┐
│ alerts │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
│ alerts_amt_max │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │
└────────────────┴──────────┴─────────────┴─────────────┴──────────────────┘
The difference in performance between different approaches becomes less notable. Here is the summary table:
After inserts | After OPTIMIZE FINAL | |
ReplacingMergeTree FINAL | 0.278 | 0.037 |
argMax | 0.059 | 0.034 |
any/max | 0.055 | 0.029 |
AggregatingMergeTree | 0.036 | 0.026 |
Conclusion
ClickHouse provides a rich toolset to handle real-time updates such as ReplacingMergeTree, CollapsingMergeTree (not reviewed here), AggregatingMergeTree and aggregate functions. All those approaches have three common properties:
Data is “modified” by inserting the new version. Inserts in ClickHouse are extremely fast.
There are efficient ways to emulate update semantics similar to those of OLTP databases
However, the actual modification does not happen immediately.
The choice of the particular approach depends on the application use case. ReplacingMergeTree is straightforward and the most convenient for the user but may only be used for small to medium-sized tables or if the data is always queried by the primary key. The use of aggregate functions gives more flexibility and performance but requires quite a lot of query rewrite. And finally, AggregatingMergeTree allows storage saving, keeping only modified columns. These are good tools to have in the arsenal of ClickHouse DB designer and apply when needed.
Published at DZone with permission of Alexander Zaitsev. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments