Kafka Streams
An introductory look at Kafka Streams, and how developers can use this tool to better handle large amounts of data in their applications.
Join the DZone community and get the full member experience.
Join For FreeIn my previous article, I explained an example of a Kafka producer and consumer. In this article, I will explain the usage of Kafka Streams.
With Kafka producers and consumers you can create records and consume those records, but you cannot analyze them. For analyzing the data you need some other application, like Storm. So for analyzing the data you have to be dependent on another application. For this Kafka, provides the Streams API.
Kafka Streams is a client library to process and analyze the data stored in Kafka. Below are some highlights of Kafka Streams.
Simple and lightweight client library.
No external dependency on any system other than Apache Kafka.
Fault-tolerant local state. Local state is required to perform stateful operations like join, etc.
Supports exactly once processing semantic, i.e. one record will be processed only once.
Supports event-time based windowing operations.
Provides a high-level DSL and a low-level processor API to define the topology.
Key Concepts of Kafka Streams
Kafka Streams represents an unbounded, continuously updating dataset of immutable records where each record is defined as a key-value pair.
A Kafka Streams processing application defines its computational logic through one or more processor topologies, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).
A Streams processor is a node in a Streams topology. It receives a record from a topic or it's upstream processor and produces one or more records and write these records into a Kafka topic or downstream processor.
Source Processor: In a stream topology, a source processor node consumes records from one or more Kafka topics and produce these records to a Kafka topic or it's downstream processor nodes. It does not receive records from processor nodes, i.e. it is at the top of the hierarchy.
Sink Processor: A sink processor node receives records from an upstream processor node. It does not have any downstream processor nodes and, after processing records, it writes these records back into a Kafka topic.
Kafka Streams provides two ways to define streaming topologies.
Kafka Streams DSL: It provides inbuilt functions for data transformations. For example:
map
,mapValues
,filter
,selectKey
,flatMap
,flatMapValues
,merge
,branch
.map
: Transforms each record of the input stream into a new record of the output stream.mapValues
: Creates a new stream which keeps all the input stream's records with the same key but changes the value.selectKey
: Creates a new stream which keeps all the input stream's records with the same value but changes the key.filter
: Creates a new stream which contains all the input stream's records that satisfy a given predicate.flatMap
: Transform each record of the input stream into zero or more records in the output stream. Both the key and values can be changed in output records.flatMapValues
: Transform each record of the input stream into zero or more records in the output stream. For all output records, the key will be the same as the input record, so only the value can be changed.merge
: Merge two streams.branch
: This function takes an array of predicates as an input and creates an array of kstream. If for an input record, any predicate, let's say index 5, is valuedtrue
, then this record will be assigned to the output stream at index 5.
There are many other operations, like join
, aggregate
, etc.
KStream<String, Transaction> source = builder.stream("sourcetransaction");
KStream<String, Transaction> possibleFraudlentTransaction =source.filter(new Predicate<String, Transaction>() {
@Override
public boolean test(String key, Transaction value) {
if(value.getAmount() > 10000){
return true;
}
}
});
In the above example, a stream is created from the topic sourcetransaction
, and, for each record, we chick if its amount is greater than 10000. If if is, then those records are filtered.
Processor API: This API enables developers to write their custom records processors and connect them and interact with state stores. Below are some common functions of processors.
init
: This function is called when Kafka Streams is initializing tasks. This function provides a ProcessorContext.forward
: This function forwards the key-value pair to the downstream processors. For example:processingContext.forward(key,value)
schedule
: Schedule a periodic operation for a processor. This function can be called during the initialization of the processor or while processing. Time can be defined in two ways. For example, let's say we have defined interval as 1000 ms.
public class SourceProcessor implements Processor<String, Transaction> {
private ProcessorContext context;
public void init(ProcessorContext context) {
this.context = context;
}
public void process(String key, Transaction value) {
if(value.getAmount() > 10000){
context.forward(key,value); // filter records and pass to next processor
}
}
public void punctuate(long timestamp) {
//Todo
}
public void close() {
//Todo
}
}
Conclusion
Kafka Streams provides an easy way to process continuous data using Kafka Streams's DSL and processor API. Kafka Streams's DSL provides inbuilt functions to process records and perform windowing and aggregate operations, etc. Whether Processor API gives you more flexibility overwriting your own logic for processing records.
Opinions expressed by DZone contributors are their own.
Comments