Implementing Hadoop's Input and Output Format in Spark
A detailed tutorial on how to use Apache Spark to implement Hadoop input and output formats.
Join the DZone community and get the full member experience.
Join For FreeIn this post, we will be discussing how to implement Hadoop input and output formats in Spark.
In order to understand the concepts explained here, it is best to have some basic knowledge of Apache Spark. We recommend you to go through the following posts before going through this post: Beginners Guide to Spark and Spark RDD's in Scala.
Now, let’s discuss about the input formats of Hadoop. An input split is nothing but the chunk of data that is present in HDFS. Each mapper will work on each input split. Before going through the map method, RecordReader will work on the input splits and arrange the records in key-value format.
The InputFormat
describes the input-specification for a Map-Reduce job.
The Map-Reduce framework relies on the InputFormat
of the job to do the following:
- Validate the input-specification of the job.
- Split-up the input file(s) into logical
InputSplit
s, each of which is then assigned to an individualMapper
. - Provide the
RecordReader
implementation to be used to clean input records from the logicalInputSplit
for processing by theMapper
.
The default behavior of file-based InputFormat
s, typically sub-classes of FileInputFormat
, is to split the input into logicalInputSplit
s based on the total size, in bytes, of the input files. However, the FileSystem
block size of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapreduce.input.fileinputformat.split.minsize.
By default, Hadoop takes TextInputFormat, where columns in each record are separated by tab space.
This is also called as KeyValueInputFormat. The keys and values used in Hadoop are serialized.
HadoopInputFormat
In Spark, we can implement the InputFormat of Hadoop to process the data, similar to Hadoop. For this, Spark provides API's of Hadoop in Java, Scala, Python.
Now, let’s look at a demo using Hadoop input formats in spark.
Spark has given support for both the old and new APIs of Hadoop. They are as follows:
Old APIs (Which support MapReduce libraries of Hadoop)
- hadoopRDD
- hadoopFile
New APIs (Which support MapReduce libraries of Hadoop)
- newAPIHadoopRDD
- newAPIHadoopFile
We can implement the APIs using Spark context as shown below.
Old APIs:
SparkContext.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions)
Here’s the explanation of the above line:
- conf – Here the conf to be passed is org.apache.hadoop.mapred.JobConf. In this specific format, we need to pass the input file from the configuration.
- InputFormatClass – Here you need to pass the Input format class of Hadoop.
- KeyClass – Here you need to pass the input Key class.
- ValueClass – Here you need to pass the input Value class.
SparkContext.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
The above line can be explained like so:
- Path - Here Input file is passed as the arguments itself in path.
- InputFormatClass – Here you need to pass the Input format class of Hadoop.
- KeyClass – Here you need to pass the input Key class.
- ValueClass – Here you need to pass the input Value class.
- minPartitions- Here you need to Specify the minimum number of partitions.
- New APIs
SparkContext.newAPIHadoopRDD(conf, fClass, kClass, vClass)
Here, the conf to be passed is org.apache.hadoop.conf.Configuration.
- fClass is the Input format class
- kClass is the input key class
- vClass is the input value class
SparkContext.newAPIHadoopFile(conf, fClass, kClass, vClass)
Here, the conf to be passed is org.apache.hadoop.conf.Configuration
- fClass is the Input format class
- kClass is the input key class
- vClass is the input value class
Now, let’s look at a demo using one input file. The input data we are using here is:
Manjunath50,000
Kiran40,0000
Onkar45,0000
Prateek45,0000
Now, we will implement KeyValueTextInputFormat on this data.
val input = sc.newAPIHadoopFile("/home/kiran/Hadoop_input", classOf[KeyValueTextInputFormat], classOf[Text],classOf[Text])
To use or implement any Java class in a Scala project, we need to use the syntax classOf[class_name].
Here KeyValueTextInputFormat and Text are Hadoop's IO classes written in Java. In this place, we can use our custom input format classes also, which will be discussed in our next post.
HadoopOutputFormat
Let’s look at HadoopOutputFormat and how to implement it in Spark. By default, Hadoop takes TextOutputFormat, where the key and value of the output are saved in the part file separated by commas.
The same can be implemented in Spark. Spark provides APIs for both old and new APIs of Hadoop. Spark provides API's for both mapred and MapReduce output formats.
Mapred API is saveAsHadoopFile
SparkContext.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf, codec)
The explanation of the above line is as follows:
- Path - Here in path, we need to give the path of the output file where it need to be saved.
- Keyclass - Here you need to give the output key class.
- Valueclass - Here you need to give the output value class.
- outputFormatClass – Here you need to give the outputFormat class.
- conf - Here you need to give the Hadoop configuration – org.apache.hadoop.mapred.jobConf.
- codec – Here you need to give the compression format.
- conf and codec are optional parameters.
SparkContext.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
The explanation of the above line is as follows:
- Path - Here in path, we need to give the path of the output file where it need to be saved.
- Keyclass - Here you need to give the output key class.
- Valueclass - Here you need to give the output value class.
- outputFormatClass - Here you need to give the outputFormat class.
- conf – Here you need to give the Hadoop configuration – org.apache.hadoop.conf.configuration.
Here, in the place of keyclass, valueclass, outputFormatClass, we can define and give our own customOutputFormat classes as well.
Now, let’s save the output of the above HadoopInputFormat using HadoopOutputFormat. You can refer to the below screenshot for the same.
In the above screenshot, you can see that we have saved the output using old API of Hadoop i.e., using mapred libraries.
Now, we will save the same using new APIs of Hadoop i.e., using mapreducelibraries. You can refer to the below screenshot for the same.
This is how the output is saved in Spark, using the hadoopOutputFormat!
We hope this post has been helpful in understanding how to work on Hadoop Input format and Output format in Spark. In case of any queries, feel free to drop us a comment and we will get back to you at the earliest.
Opinions expressed by DZone contributors are their own.
Comments