The Right Way to Use Spark and JDBC
Apache Spark is a wonderful tool, but sometimes it needs a bit of tuning. We look at a use case involving reading data from a JDBC source.
Join the DZone community and get the full member experience.
Join For Free
a while ago i had to read data from a mysql table, do a bit of manipulations on that data, and store the results on the disk.
the obvious choice was to use spark, as i was already using it for other stuff and it seemed super easy to implement.
this is more or less what i had to do (i removed the part which does the manipulation for the sake of simplicity):
spark.read.format("jdbc").
option("url", "jdbc:mysql://dbhost/sbschhema").
option("dbtable", "mytable").
option("user", "myuser").
option("password", "mypassword").
load().write.parquet("/data/out")
looks good, only it didn't quite work. either it was super slow or it totally crashed depending on the size of the table.
tuning spark and the cluster properties helped a bit, but it didn't solve the problems.
since i was using aws emr , it made sense to give sqoop a try since it is a part of the applications supported on emr.
sqoop import --verbose --connect jdbc:mysql://dbhost/sbschhema --username myuser --table opportunity --password mypassword --m 20 --as-parquetfile --target-dir /data/out
sqoop performed so much better almost instantly, all you needed to do is to set the number of mappers according to the size of the data and it was working perfectly.
since both spark and sqoop are based on the hadoop map-reduce framework, it's clear that spark can work at least as good as sqoop, i only needed to find out how to do it. i decided to look closer at what sqoop does to see if i can imitate that with spark.
by turning on the verbose flag of sqoop, you can get a lot more details. what i found was that sqoop is splitting the input to the different mappers which makes sense, this is map-reduce after all, spark does the same thing. but before doing that, sqoop does something smart that spark doesn't do.
it first fetches the primary key (unless you give him another key to split the data by), it then checks its minimum and maximum values. then it lets each of its mappers query the data but with different boundaries for the key, so that the rows are split evenly between the mappers.
if, for example, the key maximum value is 100, and there are 5 mappers, than the query of the first mapper will look like this:
select * from mytable where mykey >= 1 and mykey <= 20;
and the query for the second mapper will be like this:
select * from mytable where mykey >= 21 and mykey <= 40;
and so on.
this totally made sense. spark was not working properly because it didn't know how to split the data between the mappers.
so it was time to implement the same logic with spark. this means i had to do these actions on my code to make spark work properly.
-
fetch the primary key of the table.
-
find the key minimum and maximum values.
-
execute spark with those values.
this is the code i ended up with:
def main(args: array[string]){
// parsing input parameters ...
val primarykey = executequery(url, user, password, s"show keys from ${config("schema")}.${config("table")} where key_name = 'primary'").getstring(5)
val result = executequery(url, user, password, s"select min(${primarykey}), max(${primarykey}) from ${config("schema")}.${config("table")}")
val min = result.getstring(1).toint
val max = result.getstring(2).toint
val numpartitions = (max - min) / 5000 + 1
val spark = sparksession.builder().appname("spark reading jdbc").getorcreate()
var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.driver").
option("lowerbound", min).
option("upperbound", max).
option("numpartitions", numpartitions).
option("partitioncolumn", primarykey).
option("dbtable", config("table")).
option("user", user).
option("password", password).load()
// some data manipulations here ...
df.repartition(10).write.mode(savemode.overwrite).parquet(outputpath)
}
and it worked perfectly.
remarks:
-
the
numpartitions
i set for spark is just a value i found to give good results according to the number of rows. this can be changed, since the size of the data is also effected by the column size and data types of course. -
the repartition action at the end is to avoid having small files.
Published at DZone with permission of Avi Yehuda, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments