On Some Aspects of Big Data Processing in Apache Spark, Part 4: Versatile JSON and YAML Parsers
In this post, I present versatile JSON and YAML parsers for a Spark application.
Join the DZone community and get the full member experience.
Join For FreeIn my previous post, I presented design patterns to program Spark applications in a modular, maintainable, and serializable way—this time I demonstrate how to configure versatile JSON and YAML parsers to be used in Spark applications.
A Spark application typically needs to ingest JSON data to transform the data, and then save the data in a data source. On the other hand, YAML data is needed primarily to configure Spark jobs. In both cases, the data needs to be parsed according to a predefined template. In a Java Spark application, these templates are POJOs. How to program a single parser method to process a wide class of such POJO templates with data taken from local or distributed file systems?
This post is organized as follows:
- Section 1 demonstrates how to configure a versatile compact JSON parser,
- Section 2 shows how to do the same for a YAML parser.
A fully workable code can be found here. Let's go.
JSON Parser.
A Spark application may need to read a JSON file either from a local file system or from a distributed Hadoop file system (hdfs). Typically in the local case, configuration data is uploaded. Another scenario, when local JSON data is uploaded, is to run, test, and debug a Spark application in a local mode. In these cases, it is nice to have a JSON parser, where we can quickly switch between a local file system and a hdfs.
To attack this problem, let's see how data streams differ for a local file system and a hdfs. For a local file system, a DataInputStream
is obtained the following way:
DataInputStream dataInputStream = new DataInputStream(
new FileInputStream("src/test/resources/file_name.json"));
On the other hand, a hdfs data stream is obtained in a much more involved way:
final SparkSession sparkSession = SparkSession
.builder().config("//--config params--//")
.appName("APP_NAME_1")
//--enable other features--//
.getOrCreate();
SparkContext sparkContext = sparkSession.sparkContext();
final Configuration hadoopConf = sparkContext.hadoopConfiguration();
SerializableConfiguration scfg = new SerializableConfiguration(hadoopConf);
FileSystem fileSystem = FileSystem.get(scfg.value());
FSDataInputStream stream = fileSystem.open(new Path("//--hdfs file path--"));
Firstly, a Spark session is created; the session is configured to use a hdfs and other distributed data sources (Apache Hive). Secondly, after 4 intermediate steps, an instance of a distributed FileSystem
is obtained. Finally, a distributed FSDataInputStream
is created.
The local and the distributed file streams are related:
public class FSDataInputStream extends DataInputStream implements Seekable, etc
So, the local and the distributed data streams can be used in the same parser with a generic <? extends DataInputStream>
input stream type. Let's try the following solution:
public class ParserService {
public static <T extends DataInputStream, B> B parseSingleDeclarationStream(T inputStream,
Class<B> className) throws IOException {
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd").create();
JsonReader reader = new JsonReader(new InputStreamReader(inputStream, "UTF-8"));
B message = gson.fromJson(reader, className);
reader.close();
return message;
}
}
We use the Gson library to parse an input stream. Also, we need to provide a POJO class name for this parser to work. Here is an example of how to use this parser (see the code for details):
DataInputStream dataInputStream = new DataInputStream(
new FileInputStream("src/test/resources/declaration.json"));
Declaration declaration = ParserService.parseSingleDeclarationStream(dataInputStream, Declaration.class);
assertEquals(declaration.getDeclaration_id(), "ABCDE12345");
assertEquals(declaration.getDeclaration_details().length,1);
assertEquals(declaration.getDeclaration_details()[0].getDetail_1(),10);
Here the Declaration
and DeclarationDetails
are simple POJOs:
public class Declaration {
private String declaration_id;
private Short declaration_version;
private Short declaration_type;
private Date declaration_date;
private DeclarationDetails[] declaration_details;
//--getters and setters--//
}
public class DeclarationDetails {
private int detail_1;
private int detail_2;
//--getters and setters--//
}
So, this parser correctly parses child arrays and objects. Notice that Gson also parses date/time strings as long as the strings comply with a single pattern string ("yyyy-MM-dd" in this case).
YAML Parser
In our project, we use local YAML files to specify Spark jobs. For example, a basic Spark job configuration to transfer data from a Postgres database to a Hive database looks like this:
---
jobName: JOB_1
hiveTable: hive_table_1
rdbTable: rdb_table_1
---
jobName: JOB_2
hiveTable: hive_table_2
rdbTable: rdb_table_2
Here, we need to transfer data from the relational database tables (rdbTables) to the corresponding hive tables (hiveTables). We need to upload and parse this config data to get a List<JobConfig>
, where a JobConfig
is
public class JobConfig {
private String jobName;
private String hiveTable;
private String select;
//--setters and getters--//
}
The following parser solves this problem:
public class ParserService {
public static <T extends JobConfig> List<T> getLocalFSJobList(String path,
Class<T> className) throws FileNotFoundException {
List<T> list = new ArrayList<>();
DataInputStream dataStream = new DataInputStream(
new FileInputStream(path));
YamlDecoder dec = new YamlDecoder(dataStream);
YamlStream<T> stream = dec.asStreamOfType(className);
while (stream.hasNext()) {
T item = stream.next();
list.add(item);
}
return list;
}
}
Here we use YamlDecoder
from jyaml library. We use a local DataInputStream
, although a child stream class, like in the previous part, also works. In addition, this parser uses a <T extends JobConfig>
output type. Such a more specific output type allows us to add extra parameters to the basic JobConfig
class.
Notice that it is not necessary to extend a JobConfig superclass for this algorithm to work. I added this constraint because usually YAML files are local and specify Spark jobs, so more general types are not necessary.
This parser runs the following way:
@Test
public void testYmlParser() throws FileNotFoundException {
List<JobConfig> jobs = ParserService.getLocalFSJobList("src/test/resources/jobs.yml", JobConfig.class);
assertEquals(jobs.size(),2);
assertEquals(jobs.get(0).getJobName(),"JOB_1");
assertEquals(jobs.get(0).getSelect(),"rdbTable_1");
assertEquals(jobs.get(1).getJobName(),"JOB_2");
assertEquals(jobs.get(1).getSelect(),"rdbTable_2");
}
The parser correctly recognizes multiple jobs in a single file and assembles the jobs into a list. See the code for details.
Conclusions
In this post, I demonstrated how to program versatile JSON and YAML parsers. The parsers can be easily configured for local and distributed file systems. Also, the parsers can accept a wide range of POJO templates to parse data. Hope these tricks will help you in your project
Opinions expressed by DZone contributors are their own.
Comments