Apache Spark: Reading CSV Using Custom Timestamp Format
Here's the solution to a timestamp format issue that occurs when reading CSV in Spark for both Spark versions 2.0.1 or newer and for Spark versions 2.0.0 or older.
Join the DZone community and get the full member experience.
Join For FreeIn this blog, we are considering a situation where I wanted to read a CSV through Spark, but the CSV contains some timestamp columns in it. Is this going to be a problem while inferring schema at the time of reading the CSV using spark?
Well, the answer may be no if the CSV has the timestamp field in the specific yyyy-MM-dd hh:mm:ss format. In this particular case, the spark CSV reader can infer it to timestamp considering it as the default format.
id,name,age,joining_date,wedding_date
1,Joseph,25,1999-09-04 45:50:46,2014-11-22 00:00:00
val csvDataFrame = session.sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("mode","DROPMALFORMED")
.load("<path of the csv file>")
csvDataframe.printSchema()
When you read the schema of the DataFrame after reading the CSV, you will see that every field has been inferred correctly by the CSV.
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- joining_date: timestamp (nullable = true)
|-- wedding_date: timestamp (nullable = true)
But what if the timestamp fields in the CSV are in some other timestamp format? (For example, MM-dd-yyyy hh mm ss format.)
The content of the CSV file will be:
id,name,age,joining_date,wedding_date
1,Joseph,25,09-04-1999 45 50 46,11-22-2014 00 00 00
In this case, Spark does not get the timestamp field. It will not be able to infer the CSV field/column correctly considering that column to be of string type.
When you see a DataFrame schema this time, it will give the timestamp field as a string:
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- joining_date: string (nullable = true)
|-- wedding_date: string (nullable = true)
The above-mentioned issues have a solution depending on the version of Spark we are using.
Solution 1: Using Spark Version 2.0.1 and Above
Here, you have the straight-forward option timestampFormat
to give any timestamp format while reading CSV. We have to just add an extra option defining the custom timestamp format, like option(“timestampFormat”, “MM-dd-yyyy hh mm ss”)
.
val csvDataframe = session.sqlContext.read.format("com.databricks.spark.csv") .option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.option("timestampFormat", "MM-dd-yyyy hh mm ss")
.load("<path of the csv file>")
csvDataframe.printSchema()
In this way, you will have the timestamp field correctly inferred when we even have some other timestamp format in the CSV file.
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- joining_date: timestamp (nullable = true)
|-- wedding_date: timestamp (nullable = true)
Remember: This solution will work only in Spark versions greater than 2.0.0 (2.0.1 and above). If you have Spark version 2.0.0 or older, check out Solution 2 with a workaround.
Solution 2: Using Spark Version 2.0.0 or Older
In older versions of Spark, the above option for timestampFormat
does not exist, though we have the way to do so. Let it be inferred as a string, and cast the string field having the timestamp value explicitly to the timestamp.
For this, you must know the columns that need to be converted to the timestamp.
For example, I know all my timestamp fields end with _date
. Then those fields can be explicitly cast to any timestamp format.
val csvDataframe = session.sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.load("<path of the csv file>")
val updatedDF = csvDataframe.columns.filter(colName =>colName.endsWith("_date"))
.foldLeft(csvDataframe) { (outputDF, columnName) =>
outputDF.withColumn(columnName, unix_timestamp(col(columnName), "MM-dd-yyyy hh mm ss").cast("timestamp"))
}
updatedDF. printSchema ()
This way, you will able to get the correct data type for timestamp fields with other formats, as well.
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- joining_date: timestamp (nullable = true)
|-- wedding_date: timestamp (nullable = true)
Conclusion
While you read CSV using Spark, you may have problems while reading timestamp field having timestamp format other than the default one, i.e yyyy-MM-dd hh:mm:ss. This blog has the solution to this timestamp format issue that occurs when reading CSV in Spark for both Spark versions 2.0.1 or newer and for Spark versions 2.0.0 or older.
Published at DZone with permission of Jyotsna Karan. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments