Transformations of Varying JSON Payloads Using Spark-Streaming
With spark-streaming, you just have to create a read-stream from the data source so you can create the write-stream to load the data into a target data source.
Join the DZone community and get the full member experience.
Join For FreeSpark-streaming can be used to read the data from a source in a streaming fashion. We just have to create a read-stream from the data source and then we can create the write-stream to load the data into a target datasource.
For this demo, I will assume that we have different JSON payloads coming into a kafka topic that we need to transform and write it to another kafka topic.
Create a ReadStream
The topic which we will have, is receiving the JSON payloads as messages continuously. For that, we need to first read the messages and create a dataframe using readstream of spark. The readStream function is provided in the Spark and we can use this function to basically create a readStream. This will be reading the streaming payloads from the kafka-topic.
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
We can create a case-class (e.g. CustomerUnion) which would contain all the possible fields of JSON payload. So that, we would be able to run select query on top of a dataframe without any fail.
val rawDfValue = rawData.selectExpr("CAST(value AS STRING)").as[String]
val schema = ScalaReflection.schemaFor[CustomerUnion].dataType.asInstanceOf[StructType]
val extractedDFWithSchema = rawDfValue.select(from_json(col("value"), schema).as("data")).select("data.*")
extractedDFWithSchema.createOrReplaceTempView(“tempView”)
This will give us a dataframe extractedDFWithSchema, which contains the columns as the fields of the payload.
Sample Input Payloads
These are two sample input payloads, but there can be any more payloads with some fields that are not present (variable).
{
“id”: 1234,
“firstName”:”Jon”,
“lastName”:”Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}
{
“firstName”:”Jon”,
“lastName”:”Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}
Sample Output Payloads
Based upon id field, we will decide output payload. If there is an id field present, we will consider this as a user update case, and we would send only “Email” and“Phone” in the output payload. We can configure any fields based upon certain conditions. This is just an example.
In case, id is not present we will send all the fields. So below can be two sample output payloads here:
{
“userid”: 1234,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}
{
“fullname”:”Jon Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}
Start WriteStreams
Once we have the dataframe, we can run as many sql queries and write to kafka topic as per the desired payloads. So for this, we can create a list of all sql queries and the loop through the list and calling the writeStream function. Let's suppose, we have a list called queryList which contains nothing but strings (i.e. sql queries).
Defining a function below for the write stream:
def startWriteStream(query: String): Unit = {
val transformedDf = spark.sql(query)
transformedDf
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
}
This will start the write stream for each query we have in the list.
queryList.foreach(startWriteStream)
spark.streams.awaitAnyTermination()
Conclusion
If we are aware of all the possible fields of the input payload, then even if there are some fields not present, our sql query won’t fail. We are specifying the schema of payload as case-class already, it will create the dataframe specifying NULL for absent fields.
This way, we can utilise spark-streaming to write multiple payloads from same topic to different topic after desired transformation/filters.
Opinions expressed by DZone contributors are their own.
Comments