Problems With Kafka Streams: The Saga Continues
Problems with the old version of Kafka haven't been solved completely. Some shortcomings have been fixed while some problems have been introduced. The cycle continues.
Join the DZone community and get the full member experience.
Join For FreeThis post continues where the previous article about Kafka Streams left off, so be sure to check it before proceeding with the current one.
After many long years, the first stable Kafka version has been released: 1.0.0. In the previous version of Kafka Streams (0.11.0.0), we saw some suboptimal behavior when using the Processor API. With the new version came updates that are supposed to alleviate some pressure from the developer when using Kafka Streams.
Kafka Streams Overview
Before diving directly into the problem, let's see how Kafka Streams are implemented. Warning: There are traces of some actual code!
Firstly, we need a main class that will contain topology
, so let's implement a dummy one:
object DummyMainClass {
def main(args: Array[String]): Unit = {
val properties = CreateKafkaStreamProperties()
val topology: TopologyBuilder = new TopologyBuilder
topology.addSource("Input", properties.inputTopic)
topology.addProcessor("Input_processor", new DummyProcessor(properties.punctuateTime), "Input")
topology.addSink("Output", properties.outputTopic, "Input_processor")
val streams: KafkaStreams = new KafkaStreams(topology, properties)
streams.start()
}
def CreateKafkaStreamProperties(): Properties = {
//some code
}
}
In several lines of code, we have created a topology
for the Kafka Streams application that reads messages from the input topic, processes them with DummyProcessor
, and outputs them to the output topic. For more details, consult the documentation.
Next in line is DummyProcessor
:
class DummyProcessor(punctuateTime: Long) extends ProcessorSupplier[String, String] with Logging {
override def get(): Processor[String, String] = new Processor[String, String] {
var context: ProcessorContext = _
var arrived: Int = 0
override def init(context: ProcessorContext): Unit = {
this.context = context
this.context.schedule(punctuateTime)
}
override def process(key: String, value: String): Unit = {
this.arrived += 1
this.logger.debug(s"Arrived: $value")
}
override def punctuate(timestamp: Long): Unit = {
this.logger.debug("Punctuate call")
this.context.forward(null, s"Processed: ${this.arrived}")
this.arrived = 0
this.context.commit()
}
override def close(): Unit = {
// does nothing
}
}
}
This DummyProcessor
is only used for demonstration purposes. Its job is to output to the topic the number of arrived messages per punctuate
period; for debugging, we have added console logging. Now, we have the entire pipeline needed for testing how Kafka Streams operate.
The input topic has three partitions and a replication factor of 1. The output topic has only one partition and a replication factor of 1.
Testing Kafka Streams
We are going to repeat the testing mentioned in the previous article. Input messages for all tests are going to be the same: message_<offset>
. The punctuate
period is set to occur every ten seconds. There are two scenarios that we need to check for both Kafka versions:
For the first test, we have a constant rate of messages on the input topic: one message per second.
For the second test, we have nine messages (one message per second), followed by a pause that lasts 21 seconds, followed by one more message.
Older Kafka Streams Version
In the first test scenario, we see ten messages on the console in the format "Arrived: message_<offset>"
accompanied by one "Punctuate call"
, followed by one message in the output topic. Messages in the output topic are "Processed: 10"
. This behavior repeats as long as we have messages on the input stream.
In the second test scenario, we see nine messages on the console in the format "Arrived: message_<offset>"
. After the 21-second long pause has passed on the console, three messages appear that read "Punctuate call"
. On the output topic, we can see three messages: "Processed: 10"
, "Processed: 0"
, and "Processed 0"
. The late message ended up in the same time window as the first nine messages and we have two unwanted punctuate
calls.
New Kafka Streams Version
After switching to a new version, we have some modifications in the code:
TopologyBuilder
in the main class has to be changed toTopology
class.Schedule
method inDummyProcessor
class is now deprecated.
One would expect that by changing the version, the previous behavior would remain the same. Well, it hasn't. What has changed?
After each process method, a punctuate
method is called. After punctuateInterval
is scheduled, punctuate
also occurs. This means the following:
In the first test scenario, each
"Arrived: message_<offset>"
message in the console is accompanied byPunctuate call
. Unsurprisingly, we have one"Processed: 1"
message in the output topic. After ten messages, we have another"Punctuate call"
and "Processed: 0" pair.In the second scenario, we have nine
"Arrived: message_<offset>"
and"Punctuate call"
pairs on the console, followed by 9"Processed: 1"
in the output topic. After the pause and tenth message, we have"Arrived: message_<offset>"
and 3"Punctuate call"
. In the output topic, we see"Processed: 1"
,"Processed: 0"
, and"Processed 0"
.
Now, if we want the same behavior as with the previous version, we need to manually check if the punctuation period has passed and then output the message to the output topic and reset the counter. From one problem (multiple punctuate
calls for the late messages), we now have two problems (multiple punctuate
calls for the late messages and punctuate
calls after each processed message).
Putting that aside, we have noticed that the schedule
method is deprecated. Along with the new version came a new API for scheduling. The old format has been preserved for backward compatibility — but it works differently.
The new schedule method has the following format:
context.schedule(punctuateInterval, PuntuationType, Punctuator)
The first parameter remains the same: the interval on which scheduled punctuation should occur. The second parameter is something new: PunctuationType
. It's enum, and it has two values: STREAM_TIME
and WALL_CLOCK_TIME
(more on them later). The third and last parameter is Punctuator
— interface with only one method punctuate (Long)
. With these changes, our DummyProcessor
has a new look:
class DummyProcessor(punctuateTime: Long) extends ProcessorSupplier[String, String] with Logging {
override def get(): Processor[String, String] = new Processor[String, String] {
var context: ProcessorContext = _
var arrived: Int = 0
override def init(context: ProcessorContext): Unit = {
this.context = context
this.context.schedule(punctuateTime, PunctuationType.STREAM_TIME, new Punctuator {
override def punctuate(timestamp: Long): Unit = {
this.logger.debug("Inner punctuate call")
this.context.forward(null, s"Inner processed: ${this.arrived}")
this.arrived = 0
this.context.commit()
}
})
}
override def process(key: String, value: String): Unit = {
this.arrived += 1
this.logger.debug(s"Arrived: $value")
}
override def punctuate(timestamp: Long): Unit = {
this.logger.debug("Punctuate call")
this.context.forward(null, s"Processed: ${this.arrived}")
this.arrived = 0
this.context.commit()
}
override def close(): Unit = {
// does nothing
}
}
}
This is not a typo — we have two punctuate
methods. The only difference is that the inner punctuation method has the Inner
prefix in its messages. Running both test scenarios yields almost identical results, with the only difference being outputted messages. On the console, "Inner punctuate call"
will replace "Punctuate call"
messages, and in the output topic, "Inner processed: *"
will replace "Processed: *"
messages. We have identical behavior when using the schedule (punctuateInterval)
method, but the outer punctuate
method is never called, even though we are obligated to implement it.
Using WALL_CLOCK_TIME
and running test scenarios, we get the following results:
In the first scenario, after starting the application, three
punctuate
methods occur (one for each partition in the input topic). After each processing message, oneinner punctuate
method is called. After the tenth message, we see threepunctuate
calls.In the second scenario, the
init
method is followed by threepunctuate
methods. The first nine messages are followed byinner punctuate
call. Then, after one second, we see three scheduledinner punctuate
calls. As no messages arrive for another ten seconds, we see again three scheduledinner punctuate
calls. Ten more seconds pass, and we see three scheduledinner punctuate
calls followed by the processing of the tenth message and onepunctuate
call (for the corresponding partition to which the message was entered).
WALL_CLOCK_TIME
solves the problem of multiple consecutive punctuate
calls, but introduces the problem of the punctuate
call after each processed message. We are just shifting the problem from one place to another.
A few notes:
After starting Kafka Streams with
WALL_CLOCK_TIME
, before consuming messages, theinner punctuate
for each partition is called.The duration of the
process
method should be shorter than thepunctuate
interval. If theprocess
method is longer than thepunctuate
interval, multiple consecutive calls will occur.Inside the
process
method, we can callcontext.partition()
to getpartition id
for a partition to which message arrived. Even though Kafka Streams creates one thread (worker) per partition, callingcontext.partition()
insidepunctuate
method (both inner and outer) will return -1. This means that inpunctuate
, we can't know which partition the code refers to.Punctuate
methods are called afterprocess
method because we have multiple partitions and the rate of input messages is relatively slow compared topunctuate
period. If we would increase the rate of messages, for many of them,punctuate
will not be called after aprocess
method.
Conclusion
Upgrading to the first stable Kafka version requires testing and checking existing applications. Some classes have been replaced or renamed so that users will pay attention to those changes to ensure that code compiles. The problem lies in changes that don't directly require interventions. There have been some changes under the hood, and now, Kafka Streams applications that compile with the newer version will produce different results.
The problems mentioned in the previous article haven't been solved completely. Some shortcomings have been fixed while some new problems have been introduced — the cycle continues.
Opinions expressed by DZone contributors are their own.
Comments