Spark Job Optimization
Spark jobs can be optimized to maximize resource utilization in a cluster, improving performance and reducing costs for large-scale data processing.
Join the DZone community and get the full member experience.
Join For FreeWe are living in an age where data is of utmost importance, be it analysis or reporting, training data for LLM models, etc. The amount of data we capture in any field is increasing exponentially, which requires a technology that can process large amounts of data in a short duration. One such technology would be Apache Spark.
Apache Spark is a cluster-based architecture that can be accessed in different flavors like Python, Scala, Java, and Spark SQL, which would make it versatile and easy to fit into most applications.
There are a lot of data scientists and data engineering teams using Spark to process big data. Be it Databricks (managed Spark) or Spark, which is manually configured, without optimization; it can lead to high cloud run costs in the long run. Hence, this step should not be an optional step.
Why Optimize?
If someone had an infinite amount of money to spend, we could keep increasing the cluster size to get more processing speed, but that would mean we would spend double the cost just by relying on the default configuration.
Even though Spark can automatically use the default options to run a job, optimizing the options to run a job would greatly improve the performance. Instead of adding more nodes, we should concentrate on utilizing each and every core and memory available.
There are so many options that can be configured when submitting a Spark job, but having these basic calculations will greatly improve the job performance.
Below is a sample spark-submit
job. Let’s go through how we can calculate resource allocation for a Spark cluster consisting of 1 master and 8 nodes. The instance type used for both the master and nodes is r5.4xlarge, where each machine has 16 CPUs and 128 GB of memory.
Note: The master has 16 cores and 128 GB of memory, which can be allocated to the driver; for our example, we are allocating 8 cores and 100 GB of memory for the driver.
spark-submit --master yarn --num-executors <no of executors per instance >
--executor-cores <spark.executors.cores * No of nodes>
--executor-memory <spark.executors.memory> --driver-cores 8 --driver-memory 100G
The general recommendation is that we use 5 cores per executor for optimal performance, but in general practice, we can use anywhere from 3 to 5 cores per executor, depending on the node size and data size.
We are allocating 1 core and 1 GB of memory to Hadoop/sys daemons processes for all our calculations.
spark.executors.cores = 3 (vCPU) no of cores/executor
no of executors per instance = (16-1)/3 = 5 executors
Total available executors = 5 * 8 =40
total available core for EMR = spark.executors.cores * no of executors per instance * no of nodes/cores
3 * 5 * 8 = 120
total available memory per executor = (128gb -1) / 5 = 25.4
spark.yarn.executor.memoryOverhead = 2.5gb (i.e. 10% of total available memory)
spark.executors.memory = (total available memory per executor) * 0.90
= 25.4 * .90 = 22.86
/*updating the spark-submit based on above calculations */
spark-submit --master yarn --num-executors 40 --executor-cores 3 --executor-memory 22gb --driver-cores 8
--driver-memory 100G
After performing the above calculations and accounting for all the available resources, the spark-submit
job will be as shown above. We are using all the available CPU cores and memory, making sure we are in complete control of the cluster.
In the above example, we are using 3 cores per executor, which means we have a parallelism of 3 cores.
If we are dealing with small amounts of data which doesn't require parallel processing per executor, then having 1 core per executor can be beneficial. In other words, we would use 120 executors instead of 40 executors, boosting the data processing performance over parallelism per executor.
Note: The options below are good to check when we are getting Java heap errors but are not mandatory.
spark.driver.memory = spark.executors.memory(recommended)
spark.driver.cores = spark.executors.cores
spark.executor.instances = (number of executors per instance * number of node/core instances) minus 1 for the driver
= (5* 8) - 1 = 39
spark.default.parallelism (generally 2-3 tasks per cpu core)
= spark.executor.instances * spark.executors.cores * 2
i.e. 39* 3 * 2 = 234
Based on the above calculations, we should be able to use the Spark cluster more efficiently. If left with the default configurations and not properly configured, the spark-submit
command would end up with improper utilization of available resources.
For example, in the optimization example above, we are utilizing a total of 9 virtual machines. If each virtual machine is charged at $1 per hour, the costs would be the following:
- Total charge per hour: 9 VMs × $1 = $9
- Per day: 9 x 24 = $216
- Per year: 216 x 365 = $78,840
One additional virtual machine will add $8,760 per year.
So, if we want faster processing speed, take an easy route, and keep adding resources, the cost of virtual machines will easily go out of control. even though it takes a little bit of effort to optimize initially, it would save a ton of cost with the best possible outcome for the resources that are paid for.
Optimizing the Spark job would ensure that we are utilizing all the resources effectively and that we achieve performance at the least possible cost.
Opinions expressed by DZone contributors are their own.
Comments