Your First Java RDD With Apache Spark
Build a simple spark RDD with the the Java API.
Join the DZone community and get the full member experience.
Join For FreeApache Spark is an amazingly powerful parallel execution interface for processing big data including mining, crunching, analyzing and representation. If you want to know more about the features and ecosystem of this framework checkout the official docs, for now I'm going to assume you're already sold and want to get started immediately.
Maven Project
Start a new Maven project with your favorite IDE or from the command line. You must have Java 8 installed on your operating system. Spark does not currently support newer versions of Java but it is awesome, trust me. When that is done, here is how you want your pom.xml file to look:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark_first</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
</project>
We have 3 important dependencies, Spark Core, running with Scala 2.11. You also have the option to run it with Scala 2.12, there will be no difference for this guide. At the time of writing, the latest version of Scala is 2.13, but spark does not support it yet. Spark Core is the main Spark engine which you use to build your RDDs. Spark SQL provides an interface to perform complex SQL operations on your dataset with ease. Hadoop HDFS provides a distributed file system implementation, from which by design Spark inherits.
Note the project properties. You need to set your compiler and runner VM to Java version 8. Now lets look at the code.
Code
In your src/ folder create a new Java file with a main method like so:
xxxxxxxxxx
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
public class MainRunner {
public static void main(String[] args) {
List<Double> doubleList = new ArrayList<>();
doubleList.add(23.44);
doubleList.add(26.43);
doubleList.add(75.35);
doubleList.add(245.767);
doubleList.add(398.445);
doubleList.add(94.72);
SparkConf sparkConf = new SparkConf().setAppName("spark_first").setMaster("local[*]");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<Double> javaRDD = sparkContext.parallelize(doubleList);
//map
JavaRDD<Integer> mappedRDD = javaRDD.map(val -> (int)Math.round(val));
mappedRDD.collect().forEach(System.out::println);
//reduce
int reducedResult = mappedRDD.reduce(Integer::sum);
System.out.println(reducedResult);
sparkContext.close();
}
}
Our RDD is a simple array list of Doubles, but you can load a dataset from your local file system or a distributed file system like HDFS or Amazon S3. There are functions to help you do that in Spark's Java API. Check it out and play around with that code.
Perform a simple map reduce, mapping the Doubles RDD to a new RDD of integers, then reduce it by calling the sum function of Integer class to return the summed value of your RDD.
If you run your project, your results should look something like this:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/06/14 20:32:45 INFO SparkContext: Running Spark version 2.4.6
20/06/14 20:32:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/06/14 20:32:53 INFO SparkContext: Submitted application: spark_first
20/06/14 20:32:55 INFO SecurityManager: Changing view acls to: Terrence
20/06/14 20:32:55 INFO SecurityManager: Changing modify acls to: Terrence
...
20/06/14 20:33:23 INFO DAGScheduler: Job 0 finished: collect at MainRunner.java:29, took 6.834479 s
23
26
75
246
398
95
20/06/14 20:33:23 INFO SparkContext: Starting job: reduce at MainRunner.java:32
...
20/06/14 20:33:24 INFO DAGScheduler: ResultStage 1 (reduce at MainRunner.java:32) finished in 0.640 s
20/06/14 20:33:24 INFO DAGScheduler: Job 1 finished: reduce at MainRunner.java:32, took 0.754748 s
863
20/06/14 20:33:24 INFO SparkUI: Stopped Spark web UI at http://Terrence.mshome.net:4040
Process finished with exit code 0
There will be a lot of verbose logs. You might want to turn that off by creating a new Log4J object and explicitly setting the logs to WARN or ERROR. Looking at lines 9-14 our new Integer RDD was printed out and on line is 19 our reduced sum.
Congratulations. You just run your first Apache Spark project with Java. Time to get your hands dirty. F
Opinions expressed by DZone contributors are their own.
Comments