Problems With Kafka Streams
Kafka Streams enables easy and powerful stream processing of Kafka events. Apps that use Kafka Streams can be run in virtually any environment. But the library isn't perfect.
Join the DZone community and get the full member experience.
Join For FreeOriginally published December 11, 2017
Before diving straight into the main topic, let me introduce you to Kafka Streams first.
We have all heard about Apache Kafka, as it has been used extensively in the big data and stream processing world. For those who are new in the Kafka world, this blog written by Nikola Ivancevic is a good intro. Kafka Streams is a library that comes with Apache Kafka. It enables easy and powerful stream processing of Kafka events. This way, existing applications can use a Kafka Streams API by simply importing the library. This means that all applications that use Kafka Streams can be run in virtually any environment.
The Kafka Streams API is implemented in Java. For now (version 0.11.0.0), only applications written in JVM can utilize this library.
How It Works
Kafka Streams library can be used in two distinct ways:
High-level stream DSL
Processor API
1. High-Level Stream DSL
Using the Stream DSL, a user can express transformation, aggregations, grouping, etc. by using several simple provided methods. In several lines of code and within a couple of minutes, it’s possible to create and deploy a working application, at least in theory. When transforming data from one format to another, internal Kafka Topics are used for storing intermediate results. This means that for each transformation in the chain, Kafka Stream will create one internal topic. With this approach, users enjoy all the benefits and security of Kafka without having to invest time in development. If anything unexpected occurs during runtime, the application is able to continue from where it was before the breakdown.
All this sounds like a neat solution. But it has some serious drawbacks:
With each transformation, data has to be serialized and written into the topic. Then, for the next operation in the chain, it has to be read from the topic, meaning that all side operations happen for every entity (like partition key calculation, persisting to disk, etc.).
Kafka Streams assumes that the Serde class used for serialization or deserialization is the one provided in the config. With changing the format of data in the operation chain, the user has to provide the appropriate Serde. If existing Serdes can't handle the used format, the user has to create a custom Serde. No big deal — just extend the Serde class and implement a custom serializer and deserializer. From one class, we ended up with 4 — not so optimal. So, for each custom format of data in the operation chain, we create three additional classes. An alternative approach is to use generic JSON or AVRO Serdes. One more thing: the user has to specify a Serde for both key and value parts of the message.
Restarting of the application. After the breakdown, the application will go through each internal topic to the last valid offset, and this can take some time — especially if log compaction is not used and/or retention period is not set up.
2. Processor API
Another way to use Kafka Streams is to use the Processor API. The Processor API provides complete flexibility; the user can implement anything not supported by Stream DSL. It's intended for users who haven't completely grasped Stream DSL or for when some exotic functionality is required. To use the Processor API, the user has to provide a concrete implementation of ProcessorSuplier
that returns the user’s implementation of the Processor
class.
The user’s implementation of the Processor
class needs to override four methods:
init
: A method that will be called upon creating an instance of theProcessor
class. It provides aProcessorContext
object for the user.ProcessorContext
is commonly used to schedule punctuation periods, forward transformed messages to output topic, or commit Processor state.process
: A method that is called for each arriving record; this is where the transformations usually happen.punctuate
: A scheduled method that is called on the configured period of times, often used to send messages in batches, export metrics, populate a database, etc.close
: A method called upon terminating the app to close connections.
Where Does the Problem Lie?
To completely understand the problem, we will first go into detail how ingestion and processing occur by default in Kafka Streams. For example purposes, the punctuate
method is configured to occur every ten seconds, and in the input stream, we have exactly one message per second. The purpose of the job is to parse input messages, collect them, and, in the punctuate
method, do a batch insert in the database, then to send metrics.
After running the Kafka Stream application, the Processor
will be created, followed by the init
method. Here is where all the connections are established. Upon successful start, the application will listen to input topic for incoming messages. It will remain idle until the first message arrives. When the first message arrives, the process
method is called — this is where transformations occur and where the result is stored for later use. If no messages are in the input topic, the application will go idle again, waiting for the next message. After each successful process, the application checks if punctuate
should be called. In our case, we will have ten process
calls followed by one punctuate
call, with this cycle repeating indefinitely as long as there are messages.
A pretty obvious behavior, isn’t it? Then why is one bolded?
This is where the things get tricky. An application can call more than one punctuate
method subsequently. To understand why, read the following sentence again: After each successful process, the application checks if punctuate
should be called.
What would happen if we have processed nine messages and there are no more messages in input topic?
The application will wait until new messages arrive in the topic. But we have started the punctuate
period, and now more than ten seconds have passed. The application is still idle and will remain like that without regard to the started punctuate
period. For the purpose of this example, the next message arrives one hour later. That message will be processed and put in the same punctuate
period before calling insert for the entire batch. After the first expected punctuate
, the application will call 359 more punctuate
methods (60 minutes * 6 punctuate per minute). Only after finishing these 360 punctuate
calls will the application resume consuming input messages.
There are two problems with this approach:
Messages that arrive long after the last processed can end up in the wrong batch.
Multiple
punctuate
methods can be called in succession.
If we want to group messages in timed windows so that each window contains only records that arrived in Kafka in the exact time bucket before adding a message to bulk, we should check the timestamp of the arriving message. After a bulk insert (to a database, to Kafka, etc.), save the current time and use that time to manually check if arriving messages belong to the next batch. The first message that doesn’t belong to the batch will trigger a bulk insert and start a new batch.
What if we are outputting metrics during the punctuate
method? One solution is to remember the time of the last punctuate
. A new punctuate
will do output only if the correct amount of time has passed. Again, for the purpose of this example, let’s say that the first punctuate
takes one second to complete and the remaining 359 punctuate
methods in succession take one second to check (without metrics output). The application will process messages for eight seconds before bulk insert. The batch collect period has been shortened by 20% because of punctuate
methods.
Why does it last only eight seconds?
Kafka Streams maps periods on regular and “correct” intervals by default. It will always start counting from zero. For our configuration intervals, this will look like:
[0, 10000),[10000, 20000), …
Note that Kafka Streams counts the duration of intervals in milliseconds. This means that the scheduled time for the first punctuate
method is 10,000 ms from starting time. No matter how many input messages arrive, the first process method that finishes after 10,000 ms will trigger punctuate
. The second punctuate is scheduled to occur 20,000 ms from starting time, it will not be affected by the duration of punctuate or process methods.
This will cause wrong metrics data (number of processed messages, latency time, etc.).
Multiple punctuate
calls in succession can occur if the punctuate
method lasts longer than intended. For example, network slowdown occurs when a database large population and high database usage are present (i.e. large inserts in the same tables, thus locking the table and preventing access) results in punctuate
s last longer that ten seconds (our interval period); for example, it lasted 30 seconds. This means that the next process will trigger punctuate
again, followed by two unwanted punctuate
calls.
One can argue that if we have such “slow” input, then stream processing is perhaps not the correct solution. This scenario can occur in systems with “fast” input; it all depends on the rate of messages and the punctuate
period. Many legacy systems still have regular downtimes, when batch jobs are run, the database is updated, a new deploy is in progress, etc. Scheduled downtimes in the legacy part of a system can affect the correctness of the stream processing part of the system — and developers have to keep that in mind.
Another way to avoid the mentioned problems is to apply stateless processing. After each processed message sends the result to output topic, leave the batching to Kafka Producer. This will only shift problems down the line, as this introduces more database calls, more tcp/ip packets over the network, and more IO calls to replicate and permanently store Kafka messages on disks.
Conclusion
The Kafka Streams library provides a performant and scalable solution that can be incorporated into virtually any existing application. It’s developed with ease of use in mind. It provides the most commonly used functions while allowing users to implement their own.
Kafka Streams is by no means a silver bullet, and the same solution might not work in different use cases. The Processor API is intended to be flexible, but that only places the implementation of corner cases on the user. The high-level stream DSL provides several solutions for windowing and aggregations, while not covering all.
Some frameworks like Apache Flink put a strong emphasis on windowing and correctness, at some cost in performance or usability. No matter the framework, corner cases always require special care.
Also read Part 2 and Part 3.
Published at DZone with permission of Aleksandar Pejakovic, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments