Transforming Data Into JSON Structure With Spark: API-Based and SQL-Based Approaches
In this blog, we will explore transformations in Spark using API-based and SQL-based approaches to convert data into JSON payloads for building pipelines
Join the DZone community and get the full member experience.
Join For FreeIn the world of data processing and analytics, Spark has emerged as a powerful tool that empowers developers and data engineers to handle structured and semi-structured data efficiently. By leveraging the distributed processing capabilities of Apache Spark, Spark can effortlessly manage large datasets and execute transformations in parallel across multiple partitions.
When working with large volumes of data, Spark partitions the data and distributes it across the cluster of machines. Spark will perform transformations on each partition independently, leading to improved performance and scalability. In this article, we will explore how to read data from different tables, perform a join operation, and transform the result into a JSON structure using Java Spark SQL code. JSON is a widely used data format for exchanging between systems.
Using multiple Partitions, we can send transformed data parallelly to different target systems, like transformed Json Payload can be easily sent to a REST API endpoint. This allows for seamless integration between Spark SQL and other systems in a distributed environment.
Additionally, JSON is well-suited for publishing data to Apache Kafka, a distributed streaming platform. Kafka enables the efficient and scalable processing of data streams, and its messaging system accepts messages in various formats. By transforming it into JSON format, you can publish it to Kafka topics, allowing downstream applications to process the events in real-time.
Moreover, by transforming the data into a JSON structure, it can be used to store in NoSQL databases like MongoDB. NoSQL databases are designed to handle large volumes of unstructured data, and the JSON format will make it easier for storage and querying capabilities.
Spark can be connected to various data sources like relational databases, streaming systems, and cloud-based storage. In this article, we will focus on reading data from three CSV files and discuss two approaches, i.e., the API-based approach and the SQL-based approach.
Students Table Schema:
student_id: Integer
name: String
age: Integer
gender: String
Courses Table Schema:
student_id: Integer
course_id: Integer
course_name: String
Address Table Schema:
student_id: Integer
street: String
city: String
state: String
country: String
zip_code: String
For both approaches, add below maven dependencies in pom.xml:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.3.0</version>
</dependency>
Approach 1: API-Based Approach
We first need to create a SparkSession, which acts as the entry point for all Spark operations.
public class JsonDataTransformation {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Json Data Transformation")
.getOrCreate();
// Read data from the tables
Dataset<Row> studentsDF = spark.read()
.format("csv")
.option("header", "true")
.load("students.csv");
Dataset<Row> coursesDF = spark.read()
.format("csv")
.option("header", "true")
.load("courses.csv");
Dataset<Row> addressDF = spark.read()
.format("csv")
.option("header", "true")
.load("address.csv");
Dataset<Row> joinedDF = studentsDF.join(coursesDF, studentsDF.col("student_id").equalTo(coursesDF.col("student_id")), "inner")
.join(addressDF, studentsDF.col("student_id").equalTo(addressDF.col("student_id")), "inner");
Dataset<Row> resultDF = joinedDF.groupBy(studentsDF.col("student_id"))
.agg(
functions.first(studentsDF.col("name")).as("name"),
functions.first(studentsDF.col("age")).as("age"),
functions.first(studentsDF.col("gender")).as("gender"),
functions.collect_list(
functions.struct(coursesDF.col("course_id"), coursesDF.col("course_name"))
).as("courses"),
functions.struct(
functions.first(addressDF.col("street")).as("street"),
functions.first(addressDF.col("city")).as("city"),
functions.first(addressDF.col("state")).as("state"),
functions.first(addressDF.col("country")).as("country"),
functions.first(addressDF.col("zip_code")).as("zip_code")
).as("address")
);
// Write the result to a JSON file
resultDF.write().format("json").save("output.json");
}
}
In the above code snippet, we start by creating a SparkSession and reading the data from the three files: "students," "courses," and "address." We then perform join operations using the join method, specifying the common column between the tables.
Next, we transform the joined DataFrame into the desired JSON structure using the groupBy and agg functions. Finally, we write the transformed data into a JSON file.
Approach 2: SQL-Based Approach
Spark SQL allows us to write SQL queries directly:
public class JsonDataTransformation {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Json Data Transformation")
.getOrCreate();
// Read data from the tables
Dataset<Row> studentsDF = spark.read()
.format("csv")
.option("header", "true")
.load("students.csv");
Dataset<Row> coursesDF = spark.read()
.format("csv")
.option("header", "true")
.load("courses.csv");
Dataset<Row> addressDF = spark.read()
.format("csv")
.option("header", "true")
.load("address.csv");
studentsDF.createOrReplaceTempView("students");
coursesDF.createOrReplaceTempView("courses");
addressDF.createOrReplaceTempView("address");
// Transform coursesDF into JSON structure
spark.sql("SELECT student_id, COLLECT_LIST(STRUCT(course_id, course_name)) AS Courses " +
"FROM courses " +
"GROUP BY student_id").createOrReplaceTempView("courses");;
spark.sql("SELECT student_id, STRUCT(street, city, state, country, zip_code) AS Address " +
"FROM address").createOrReplaceTempView("address");
// Perform the SQL query to obtain the desired JSON structure
Dataset<Row> resultDF = spark.sql("SELECT " +
"s.student_id, " +
"s.name, " +
"s.age, " +
"s.gender, " +
"tc.Courses AS courses, " +
"ta.Address AS address " +
"FROM students s " +
"LEFT JOIN courses tc ON s.student_id = tc.student_id " +
"LEFT JOIN address ta ON s.student_id = ta.student_id");
// Write the result to a JSON file
resultDF.write().format("json").save("output.json");
}
}
In the code snippet above, we use the createOrReplaceTempView method to register each table as a temporary view with its corresponding name. First, convert courses into JSON array and address into JSON object and then write a SQL query that performs the join operations with a common key. Finally, we write the transformed data into a JSON file.
The approach of replacing the original temp tables with the output after every transformation will be helpful for a configuration-driven pipeline.
By using the same approach and transformations, we can extract data from multiple tables and map the fields from these tables to a single object or an array in the target JSON document. we can also move fields from one table to multiple objects in the resulting JSON structure. For Larger datasets, we can use each partition for parallel processing.
Example of the transformed JSON structure:
{
"student_id": 1,
"name": "Foo",
"age": 25,
"gender": "Male",
"courses": [
{
"course_id": 101,
"course_name": "Mathematics"
},
{
"course_id": 102,
"course_name": "Physics"
}
],
"address": {
"street": "123 Main St",
"city": "Dallas",
"state": "Texas",
"country": "USA",
"zip_code": "10001"
}
}
Conclusion
To summarize, we have explored two approaches for reading data from multiple tables and transforming it into a JSON structure using Spark. The API-based approach involves using Spark's DataFrame API to perform transformations, while the SQL-based approach allows us to write SQL queries directly. Both approaches provide flexibility and efficiency in handling complex data transformation tasks in building pipelines
Opinions expressed by DZone contributors are their own.
Comments