Kafka Avro Scala Example
Learn how to write and read messages in Avro format to/from Kafka. Read on to understand how to produce messages encoded with Avro, how to send them into Kafka, and how to consume with consumer and finally how to decode them.
Join the DZone community and get the full member experience.
Join For FreeLet's learn how to write and read messages in Avro format to/from Kafka. Read on to understand how to produce messages encoded with Avro, how to send them into Kafka, and how to consume with consumer and finally how to decode them. But, instead of using with plain-text messages, we will serialize our messages with Avro. That will allow us to send much more complex data structures over the wire.
Avro
Apache Avro is a language neutral data serialization format. Avro data is described in a language independent schema. The schema is usually written in JSON format and the serialization is usually to binary files although serialization to JSON is also supported.
Let’s add Avro dependency in our build:
"org.apache.avro" % "avro" % "1.7.7"
We will consider a schema like this:
{
"namespace": "kakfa-avro.test",
"type": "record",
"name": "user",
"fields":[
{ "name": "id", "type": "int"},
{ "name": "name", "type": "string"},
{ "name": "email", "type": ["string", "null"]}
]
}
You can instantiate schema as follows:
val schema: Schema = new Schema.Parser().parse(SCHEMA_STRING)
Here, SCHEMA_STRING is the JSON listed above as a Java String.
Now, we can create an Avro generic record object with instantiated schema and put user data into it.
val genericRecord: GenericRecord = new GenericData.Record(schema)
genericUser.put("id", "1")
genericUser.put("name", "singh")
genericUser.put("email", null)
After creating the generic record. Now we need to serialize the above generic record object. Here we will use Avro binary encoder to encode object into byte array.
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(genericUser, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
You can also use many third party API to serialize and deserialize and may be most friendly API.
So, it’s time to send serialized message to Kafka using producer. Here is entire Kafka Producer code:
Producer
import java.util.{Properties, UUID}
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import domain.User
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import java.io.ByteArrayOutputStream
import org.apache.avro.io._
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import scala.io.Source
class KafkaProducer() {
private val props = new Properties()
props.put("metadata.broker.list", "localhost:9092")
props.put("message.send.max.retries", "5")
props.put("request.required.acks", "-1")
props.put("serializer.class", "kafka.serializer.DefaultEncoder")
props.put("client.id", UUID.randomUUID().toString())
private val producer = new Producer[String, Array[Byte]](new ProducerConfig(props))
//Read avro schema file
val schema: Schema = new Parser().parse(Source.fromURL(getClass.getResource("/schema.avsc")).mkString)
// Create avro generic record object
val genericUser: GenericRecord = new GenericData.Record(schema)
//Put data in that generic record
genericUser.put("id", "1")
genericUser.put("name", "sushil")
genericUser.put("email", null)
// Serialize generic record into byte array
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(genericUser, encoder)
encoder.flush()
out.close()
val serializedBytes: Array[Byte] = out.toByteArray()
val queueMessage = new KeyedMessage[String, Array[Byte]](topic, serializedBytes)
producer.send(queueMessage)
Now, in the same way we updated the producer to send binary message, we will create consumer which consume message from Kafka, deserialize and make generic record from it.
Consumer
import java.util.Properties
import domain.User
import org.apache.avro.Schema
import org.apache.avro.io.DatumReader
import org.apache.avro.io.Decoder
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder
import scala.io.Source
class KafkaConsumer() {
private val props = new Properties()
val groupId = "demo-topic-consumer"
val topic = "demo-topic"
props.put("group.id", groupId)
props.put("zookeeper.connect", "localhost:2181")
props.put("auto.offset.reset", "smallest")
props.put("consumer.timeout.ms", "120000")
props.put("auto.commit.interval.ms", "10000")
private val consumerConfig = new ConsumerConfig(props)
private val consumerConnector = Consumer.create(consumerConfig)
private val filterSpec = new Whitelist(topic)
private val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)
lazy val iterator = streams.iterator()
//read avro schema file
val schemaString = Source.fromURL(getClass.getResource("/schema.avsc")).mkString
// Initialize schema
val schema: Schema = new Schema.Parser().parse(schemaString)
def read() =
try {
if (hasNext) {
println("Getting message from queue.............")
val message: Array[Byte] = iterator.next().message()
getUser(message)
} else {
None
}
} catch {
case ex: Exception => ex.printStackTrace()
None
}
private def hasNext: Boolean =
try
iterator.hasNext()
catch {
case timeOutEx: ConsumerTimeoutException =>
false
case ex: Exception => ex.printStackTrace()
println("Got error when reading message ")
false
}
private def getUser(message: Array[Byte]) = {
// Deserialize and create generic record
val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](schema)
val decoder: Decoder = DecoderFactory.get().binaryDecoder(message, null)
val userData: GenericRecord = reader.read(null, decoder)
// Make user object
val user = User(userData.get("id").toString.toInt, userData.get("name").toString, try {
Some(userData.get("email").toString)
} catch {
case _ => None
})
Some(user)
}
}
Conclusion
In this post, we have seen how to produce messages encoded with Avro, how to send them into Kafka, how to consume with consumer and finally how to decode them. This helps us make a messaging system with complex data with the help of Kafka and Avro.
The one thing you have to note that the same Avro schema must be present on the both side (Producer and Consumer) to encode and decode messages. Any change to schema, must be applied on both sides. To overcome this problem, Confluent Platform comes into play with its Schema Registry which allows us to share Avro schema and handle changes of schemas.
You can find complete code on GitHub.
Published at DZone with permission of Sushil Kumar Singh, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments