Apache Flink With Kafka - Consumer and Producer
Join the DZone community and get the full member experience.
Join For FreeOverview
Apache Flink provides various connectors to integrate with other systems. In this article, I will share an example of consuming records from Kafka through FlinkKafkaConsumer
and producing records to Kafka using FlinkKafkaProducer
.
Setup
I installed Kafka locally and created two Topics, TOPIC-IN
and TOPIC-OUT
.
xxxxxxxxxx
# Create two topics
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TOPIC-IN
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic TOPIC-OUT
# List all topics
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
TOPIC-IN
TOPIC-OUT
__consumer_offsets
I wrote a very simple NumberGenerator, which will generate a number every second and send it to TOPIC_IN
using a KafkaProducer
object. The code for both is available on Github.
A sample run produces the following output:
xxxxxxxxxx
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC-IN --property print.key=true --from-beginning
myKey [1]
myKey [2]
myKey [3]
myKey [4]
myKey [5]
myKey [6]
myKey [7]
myKey [8]
myKey [9]
FlinkKafkaConnector Example
First, define a FlinkKafkaConsumer
, as shown below:
xxxxxxxxxx
String TOPIC_IN = "TOPIC-IN";
String TOPIC_OUT = "TOPIC-OUT";
String BOOTSTRAP_SERVER = "localhost:9092";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// to use allowed lateness and timestamp from kafka message
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVER);
props.put("client.id", "flink-kafka-example");
// consumer to get both key/values per Topic
FlinkKafkaConsumer<KafkaRecord> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, new MySchema(), props);
// for allowing Flink to handle late elements
kafkaConsumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<KafkaRecord>()
{
public long extractAscendingTimestamp(KafkaRecord record)
{
return record.timestamp;
}
});
kafkaConsumer.setStartFromLatest();
Line #5: Get a local Flink StreamExecutionEnvrionment.
Line #8: Required to use timestamp coming in the messages from Kafka. Otherwise, Flink will use the system clock.
Line #15: Create a FlinkKafkaConsumer<>
object, which will act as a source for us. The class "KafkaRecord" is a wrapper for the key and value coming from Kafka, and the MySchema
class implements KafkaDeserializationSchema<KafkaRecord>
to provide deserialization logic used by Flink to convert byte[]
from Kafka to String.
The code for both is available here. This is required because I want to read both the key and value of the Kafka messages.
Line #18 to #25: Required to inform Flink where it should read the timestamp. This is used to decide the start and end of a TumblingTimewindow.
After this, we need to define a FlinkKafkaProducer
, as shown below:
xxxxxxxxxx
Properties prodProps = new Properties();
prodProps.put("bootstrap.servers", BOOTSTRAP_SERVER);
FlinkKafkaProducer<KafkaRecord> kafkaProducer =
new FlinkKafkaProducer<KafkaRecord>(TOPIC_OUT,
((record, timestamp) -> new ProducerRecord<byte[], byte[]>(TOPIC_OUT, record.key.getBytes(), record.value.getBytes())),
prodProps,
Semantic.EXACTLY_ONCE);
Now, we can define a simple pipeline, as shown below:
xxxxxxxxxx
DataStream<KafkaRecord> stream = env.addSource(kafkaConsumer);
stream
.filter((record) -> record.value != null && !record.value.isEmpty())
.keyBy(record -> record.key)
.timeWindow(Time.seconds(5))
.allowedLateness(Time.seconds(1))
.reduce(new ReduceFunction<KafkaRecord>()
{
KafkaRecord result = new KafkaRecord();
public KafkaRecord reduce(KafkaRecord record1, KafkaRecord record2) throws Exception
{
result.key = "outKey";
result.value = record1.value+record2.value;
return result;
}
})
.addSink(kafkaProducer);
// produce a number as string every second
new NumberGenerator(p, TOPIC_IN).start();
// start flink
env.execute();
Line #1: Create a DataStream
from the FlinkKafkaConsumer
object as the source.
Line #3: Filter out null and empty values coming from Kafka.
Line #5: Key the Flink stream based on the key present in Kafka messages. This will logically partition the stream and allow parallel execution on a per-key basis.
Line #6 to #7: Define a time window of five seconds and provide lateness of an extra second.
Line #8 to #19: Simple reduction logic that appends all the numbers collected in a window and sends the result using a new key "outKey".
Line #20: Sends the output of each window to the FlinkKafkaProducer
object created above.
Line #23: Start the NumberGenerator
.
Line #26: Start the Flink execution environment.
A sample run of this code produces the following output:
xxxxxxxxxx
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC-OUT --property print.key=true --from-beginning
outKey [5][6]
outKey [7][8][9][10][11]
outKey [12][13][14][15][16]
outKey [17][18][19][20][21]
outKey [22][23][24][25][26]
Conclusion
The above example shows how to use Flink's Kafka connector API to consume as well as produce messages to Kafka and customized deserialization when reading data from Kafka.
Opinions expressed by DZone contributors are their own.
Comments