Apache Flink: Full Checkpoint vs Incremental Checkpoint
This article will analyze the benefits a streaming application can gain when configured to run with incremental checkpointing.
Join the DZone community and get the full member experience.
Join For FreeApache Flink is a real-time data stream processing engine. Most of the stream processing applications are ‘stateful.’ This means the state is stored and used for further processing. In Apache Flink, the state is managed through a configured state backend. Flink supports two-state backends in production. One is the HashMapStateBackend
, and the other one is the EmbeddedRocksDBStateBackend
.
To prevent data loss and achieve fault tolerance, Flink can persist snapshots of the state to a durable storage. Flink can be configured to snapshot either the entire state into a durable location or the delta since the last snapshot. The former is called full checkpoint, and the latter is known as the incremental checkpoint.
In this article, we are going to compare HashMapStateBackend
with full checkpoint and EmbeddedRocksDBStateBackend
with incremental checkpointing. This article assumes that the audience has either working knowledge or theoretical knowledge of Apache Flink.
Overview of Flink State Backend
To understand the Flink state backend, it is important to know the difference between in-flight state and state snapshots.
The in-flight state is known as Flink’s working state and is stored locally. Based on the state back-end configuration, it is either in heap memory or off-heap memory, with a possible spillover to the local disk.
On the other hand, state snapshots (checkpoint or save point) are stored in a durable remote location. These snapshots are used to reconstruct the Flink job state in case of a job failure.
The in-flight state can be lost if the job fails. This doesn’t impact the job recovery if the checkpoint is enabled in the job. When the checkpoint is configured, the state gets retrieved from durable storage at the time of recovery.
Which state backend to be selected for the production depends on the application’s requirement for throughput, latency, and scalability.
There are two state backends that Apache Flink supports in production.
1. HashMapStateBackend
It is a lightweight state backend in Flink for managing the Keyed State and Operator State during the stream processing. The state is stored in the Java Heap using a HashMap data structure. Since it is stored in memory, the main constraint here is that the maximum state size is limited to the Java Heap size. There is no serialization involved in writing to the state or reading from the state. So, this is suitable for low latency, high throughput, and not-so-big state applications.
2. EmbeddedRocksDBStateBackend
This state backend stores the in-flight data in the in-memory RocksDB database. By default, RocksDB stores the data in the local disk of the task manager. The data is serialized and stored in an off-heap memory and spilled over to a local disk attached to the task manager. The serialization format depends on the type serializer configured in the application.
With this state backend, the amount of state that can be stored is only limited by the disk space attached to the task manager. If the application has a huge state and can’t be contained in the heap memory, this is the right state backend. Since serialization is involved, the application is going to have higher latency and lower throughput compared to HashMapStateBackend
.
Overview of Snapshot State
The snapshot represents the global state of the Flink Job. This consists of a pointer to each data source and the state of all Flink’s stateful operators after processing up to those pointers from sources. Checkpointing in Apache Flink is a mechanism to achieve fault tolerance by periodically saving the state to durable remote storage.
In case of job failure, Flink retrieves the stored state from durable remote storage and starts processing the streaming data from where it left off. Flink uses asynchronous barrier snapshotting. It is a variant of the Chandy-Lamport algorithm.
Flink supports two types of checkpointing.
1. Full Checkpointing
Full checkpointing is where the entire state of the Flink job is captured and stored in durable remote storage. In case of a job failure, the job recovers from the previously stored state. The storage space requirement and the time taken to do checkpointing are entirely dependent on the application state. The full checkpointing works with both HashMapStateBackend
and RocksDBStateBackend
.
2. Incremental Checkpointing
Incremental checkpointing is an optimized approach. Instead of snapshotting the entire state, Flink saves only the ‘deltas’ made to the state since the last checkpoint. This reduces the network overhead and, thus, the time taken for checkpointing. The checkpointing is happening fully asynchronous in this case.
Only RocksDBStateBackend
supports the incremental checkpointing. Flink leverages RocksDBs internal mechanism for this. Even though checkpointing takes less time than full checkpointing, in case of a job failure, the recovery time depends on many factors. If the network is a bottleneck, the recovery time can be higher than the recovery from full checkpointing.
Analysis
Pipeline details: Apache Beam pipeline running on the Flink engine with a fixed window of 10 minutes and the checkpoint is configured to run every 3 minutes. The serialization type configured is AVRO.
- Cluster type: "m5dn.4xlarge"
- Final checkpoint storage: S3
- Number of unique keys: 2K
Input rate 10k/s (~ 7.8 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Type | No: of TM |
Parallelism | Heap Allocation per TM |
Heap Usage per TM |
Pod Memory Usage per TM |
CPU usage per TM |
Checkpoint Size |
Checkpoint Duration |
Flink Managed Memory |
HashMapState With Full Checkpoint |
1 | 1 | 10GB | 8.5GB | 11.1GB | 1.2 | 4GB | 50 sec | 0 |
RocksDBState With Incremental Checkpoint with AVRO |
1 | 1 | 3GB | 1.82GB | 4.63GB | 1.5 | 207MB | 3 sec | 3GB |
Input rate 20k/s (~15.6 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Type | No: of TM |
Parallelism | Heap Allocation per TM |
Heap Usage per TM |
Pod Usage per TM |
CPU usage per TM |
Checkpoint Size |
Checkpoint Duration |
Flink Managed Memory |
HashMapState With Full Checkpoint |
2 | 2 | 10GB | 8.69GB | 11.2GB | 1.3 | 8.39GB | 50 sec | 0 |
RocksDBState With Incremental Checkpoint with AVRO |
2 | 2 | 3GB | 1.87GB | 4.71GB | 1.4 | 404MB | 3 sec | 3GB |
input rate 30k/s (~23.5 MB/s) | |||||||||
---|---|---|---|---|---|---|---|---|---|
Type | No: of TM |
Parallelism | Heap Allocation per TM |
Heap Usage per TM |
Pod Usage per TM |
CPU usage per TM |
Checkpoint Size |
Checkpoint Duration |
Flink Managed Memory |
HashMapState With Full Checkpoint |
3 | 3 | 10GB | 9.74GB | 11.2GB | 1.2 | 11.8GB | 65 sec | 0 |
RocksDBState With Incremental Checkpoint with AVRO |
3 | 3 | 3GB | 1.72GB | 3.75GB | 1.4 | 576MB | 5 sec | 3GB |
As you can see from the above experiment, the checkpoint duration decreases with incremental checkpointing. This can very well help with application performance.
Summary
Below is the summary of the experiment.
HashMapStateBackend with Full Checkpoint | RocksDBStateBackend with Incremental Checkpoint | |
Application Latency | Low latency because data is stored as Java Objects in the Heap. Reading and writing don't involve any serialization. | Since serialization is involved in every read or write application, latency will be higher. |
Scalability | Less scalable for jobs with large state | Highly scalable for jobs with large state and slowly changing states |
Fault Tolerance | Highly fault tolerant | Highly fault tolerant |
Checkpoint duration | Checkpoint duration is high because snapshotting is happening for the entire data set every time. | Checkpointing duration is less because only the delta since the last checkpoint is saved. |
Recover Complexity | Recovery is easy because only one snapshot has to be loaded. | Recovery is complex because RocksDB has to build the state from multiple checkpoints and much depends on the network speed. |
Storage Requirement | Supported by both HashMapStateBackend and RocksDBStatebackend. | Supported only by RocksDBStatebackend. |
State snapshot | Saves the entire state at every checkpoint. | Saves only the delta since the last successful one. |
Heap Size | Since the state is stored in the Heap before checkpointing, the Heap requirement is high, and more GC cycles are to be expected. | States are stored in the off-heap and possibly on the local disk, thus less Heap space and lesser GC cycles. |
State Backend Size | Limited to the max heap allocated to a JVM. | RocksDB state backend size is not limited by the JVM Heap limit but only by the available disk space. |
Performance Impact | Higher impact on processing because it is a full snapshot. | Lesser impact on processing because it is only the delta that is snapshotted. |
CPU | CPU usage is only for processing and GC. No state back-end serialization is involved. | CPU usage is higher compared to Full Checkpoint for the same input data rate. CPU utilization can be optimized by applying a proper serialization mechanism. We experimented with Avro and got much better results compared to Kryo |
Best Use case | Good for smaller state backend size and frequently changing state. | Good for higher state backend and slowly updating the state. |
Opinions expressed by DZone contributors are their own.
Comments