Convert RDD to DataFrame with Spark
Learn how to convert an RDD to DataFrame in Databricks Spark CSV library.
Join the DZone community and get the full member experience.
Join For FreeAs I mentioned in a previous blog post I’ve been playing around with the Databricks Spark CSV library and wanted to take a CSV file, clean it up and then write out a new CSV file containing some of the columns.
I started by processing the CSV file and writing it into a temporary table:
import org.apache.spark.sql.{SQLContext, Row, DataFrame}
val sqlContext = new SQLContext(sc)
val crimeFile = "Crimes_-_2001_to_present.csv"
sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")
I wanted to get to the point where I could call the following function which writes a DataFrame to disk:
private def createFile(df: DataFrame, file: String, header: String): Unit = {
FileUtil.fullyDelete(new File(file))
val tmpFile = "tmp/" + System.currentTimeMillis() + "-" + file
df.distinct.save(tmpFile, "com.databricks.spark.csv")
}
The first file only needs to contain the primary type of crime, which we can extract with the following query:
val rows = sqlContext.sql("select `Primary Type` as primaryType FROM crimes LIMIT 10")
rows.collect()
res4: Array[org.apache.spark.sql.Row] = Array([ASSAULT], [ROBBERY], [CRIMINAL DAMAGE], [THEFT], [THEFT], [BURGLARY], [THEFT], [BURGLARY], [THEFT], [CRIMINAL DAMAGE])
Some of the primary types have trailing spaces which I want to get rid of. As far as I can tell Spark’s variant of SQL doesn’t have the LTRIM or RTRIM functions but we can map over ‘rows’ and use the String ‘trim’ function instead:
rows.map { case Row(primaryType: String) => Row(primaryType.trim) }
res8: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[29] at map at DataFrame.scala:776
Now we’ve got an RDD of Rows which we need to convert back to a DataFrame again. ‘sqlContext’ has a function which we might be able to use:
sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })
<console>:27: error: overloaded method value createDataFrame with alternatives:
[A <: Product](data: Seq[A])(implicit evidence$4: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
[A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
cannot be applied to (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])
sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => Row(primaryType.trim) })
^
These are the signatures we can choose from:
If we want to pass in an RDD of type Row we’re going to have to define a StructType or we can convert each row into something more strongly typed:
case class CrimeType(primaryType: String)
sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) })
res14: org.apache.spark.sql.DataFrame = [primaryType: string]
Great, we’ve got our DataFrame which we can now plug into the ‘createFile’ function like so:
createFile(
sqlContext.createDataFrame(rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }),
"/tmp/crimeTypes.csv",
"crimeType:ID(CrimeType)")
We can actually do better though!
Since we’ve got an RDD of a specific class we can make use of the ‘rddToDataFrameHolder’ implicit function and then the ‘toDF’ function on ‘DataFrameHolder’. This is what the code looks like:
import sqlContext.implicits._
createFile(
rows.map { case Row(primaryType: String) => CrimeType(primaryType.trim) }.toDF(),
"/tmp/crimeTypes.csv",
"crimeType:ID(CrimeType)")
And we’re done!
Published at DZone with permission of Mark Needham, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments