Introduction to SparkSession
We go over how to use this new feature of Apache Spark 2.0, covering all the Scala and SQL code you'll need to get started.
Join the DZone community and get the full member experience.
Join For FreeSpark 2.0 is the next major release of Apache Spark. This brings major changes to the level of abstraction for the Spark API and libraries. In this blog post, I’ll be discussing SparkSession.
Intro to SparkSession
Before getting into SparkSession, let's understand the entry point. An entry point is where control is transferred from the operating system to the provided program. Before 2.0, the entry point to Spark Core was sparkContext
. Apache Spark is a powerful cluster computing engine, therefore it is designed for fast computation of big data.
sparkContext in Apache Spark
Web
An important step for any Spark driver application is to generate sparkContext
. It allows your Spark application to access the Spark cluster with the help of the resource manager. The resource manager can be one of these three:
SparkStandalone
YARN
Apache Mesos
Functions of sparkContext in Apache Spark
Get the current status of your Spark application.
Set configurations.
Access various services.
Cancel a job.
Cancel a stage.
Closure cleaning.
Register SparkListener.
Programmable dynamic allocation.
Access persistent RDD.
Prior to Spark 2.0, sparkContext
was used as a channel to access all Spark functionalities. The Spark driver program uses sparkContext
to connect to the cluster through resource manager.
SparkConf is required to create sparkContext objects, which stores configuration parameters like appName (to identify your Spark driver), the core number, and the memory size of the executor running on a worker node.
In order to use SQL APIs, Hive, and streaming, separate contexst need to be created.
Example:
val conf = new SparkConf()
.setMaster("local")
.setAppName("Spark Practice2")
val sc = new SparkContext(conf)
SparkSession – New entry-point of Spark
introduction-to-apache-spark-20-12-638
As we know, in previous versions, sparkContext is the entry point for Spark. As RDD was the main API, it was created and manipulated using context APIs. For every other API, we needed to use a different context.
For streamin, we needed streamingContext
. For SQL, sqlContext
, and for Hive, hiveContext
. But as DataSet and DataFrame APIs are becoming new standalone APIs, we need an entry point build for them. So in Spark 2.0, we have a new entry point build for DataSet and DataFrame APIs called as SparkSession.
jumpstart-on-apache-spark-22-on-databricks-40-638
It's a combination of SQLContext, HiveContext, and streamingContext. All the APIs available on those contexts are available on SparkSession; SparkSession also has a sparkContext for actual computation.
spark-sql-SessionState
Now we can look at how to create a SparkSession and interact with it.
Creating a SparkSession
The following code comes in handy when you want to create a SparkSession:
val spark = SparkSession.builder()
.master("local")
.appName("example of SparkSession")
.config("spark.some.config.option", "some-value")
.getOrCreate()
SparkSession.builder()
This method is created for constructing a SparkSession.
master(“local”)
Sets the Spark master URL to connect to:
“local” to run locally
“local[4]” to run locally with 4 cores
“spark://master:7077” to run on a spark standalone cluster
appName( )
Set a name for the application which will be shown in the spark Web UI.
If no application name is set, a randomly generated name will be used.
Config
This keyword sets a config option using this method that are automatically propagated to both ‘SparkConf’ and ‘SparkSession’ configurations. Its arguments consist of key-value pairs.
GetOrElse
Gets an existing SparkSession or, if there is a valid thread-local SparkSession, it returns that one. It then checks whether there is a valid global default SparkSession and, if so, returns that one. If no valid global SparkSession exists, the method creates a new SparkSession and assigns newly created SparkSessions as the global default.
In case an existing SparkSession is returned, the config option specified in this builder will be applied to existing SparkSessions.
The above is similar to creating a SparkContext with local and creating an SQLContext wrapping it. If you can need to create a Hive context, you can use the below code to create a SparkSession with Hive support:
val spark = SparkSession.builder()
.master("local")
.master("local")
.appName("example of SparkSession")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()
enableHiveSupport
on the factory enables Hive support, which is similar to HiveContext
, a created SparkSession, and we can use it to read the data.
Read Data Using SparkSession
SparkSession is the entry point for reading data, similar to the old SQLContext.read.
The below code is reading data from CSV using SparkSession.
In Spark 2.0 onwards, it is better to use SparkSession as it provides access to all the Spark functionalities that sparkContext
provides. Also, it provides APIs to work with DataFrames and DataSets
val df = spark.read.format("com.databricks.spark.csv")
.schema(customSchema)
.load("data.csv")
Running SQL Queries
SparkSession can be used to execute SQL queries over data, getting the result back as a DataFrame (i.e. Dataset[ROW]).
display(spark.sql("Select * from TimeStamp"))
+--------------------+-----------+----------+-----+
| TimeStamp|Temperature| date| Time|
+--------------------+-----------+----------+-----+
|2010-02-25T05:42:...| 79.48|2010-02-25|05:42|
|2010-02-25T05:42:...| 59.27|2010-02-25|05:42|
|2010-02-25T05:42:...| 97.98|2010-02-25|05:42|
|2010-02-25T05:42:...| 91.41|2010-02-25|05:42|
|2010-02-25T05:42:...| 60.67|2010-02-25|05:42|
|2010-02-25T05:42:...| 61.41|2010-02-25|05:42|
|2010-02-25T05:42:...| 93.6|2010-02-25|05:42|
|2010-02-25T05:42:...| 50.32|2010-02-25|05:42|
|2010-02-25T05:42:...| 64.69|2010-02-25|05:42|
|2010-02-25T05:42:...| 78.57|2010-02-25|05:42|
|2010-02-25T05:42:...| 66.89|2010-02-25|05:42|
|2010-02-25T05:42:...| 62.87|2010-02-25|05:42|
|2010-02-25T05:42:...| 74.32|2010-02-25|05:42|
|2010-02-25T05:42:...| 96.55|2010-02-25|05:42|
|2010-02-25T05:42:...| 71.93|2010-02-25|05:42|
|2010-02-25T05:42:...| 79.17|2010-02-25|05:42|
|2010-02-25T05:42:...| 73.89|2010-02-25|05:42|
|2010-02-25T05:42:...| 80.97|2010-02-25|05:42|
|2010-02-25T05:42:...| 81.04|2010-02-25|05:42|
|2010-02-25T05:42:...| 53.05|2010-02-25|05:42|
+--------------------+-----------+----------+-----+
Note: Only showing top 20 rows.
Working With Config Options
SparkSession can also be used to set runtime configuration options which can toggle the optimizer behavior or I/O (i.e. Hadoop) behavior.
Spark.conf.get(“Spark.Some.config”,”abcd”)
Spark.conf.get(“Spark.Some.config”)
Config options set can also be used in SQL using variable substitution.
%Sql select “${spark.some.config}”
Working With Metadata Directly
SparkSession also includes a catalog
method that contains methods to work with the metastore (i.e. data catalog). The method returns DataSets so you can use the same DataSet API to play with them.
To get a list of tables in the current database, use the following code:
val tables =spark.catalog.listTables()
display(tables)
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
|Stu |default |null |Managed |false |
+----+--------+-----------+---------+-----------+
use the dataset API to filter on names
display(tables.filter(_.name contains “son”)))
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
|Stu |default |null |Managed |false |
+----+--------+-----------+---------+-----------+
Get the list of the column for a table
display(spark.catalog.listColumns(“smart”))
+-----+----------+----------+-----------+-------------+--------+
|name |description|dataType |nullable |isPartitioned|isbucket|
+-----+-----------+---------+-----------+-------------+--------+
|email|null |string |true |false |false |
+-----+-----------+---------+-----------+-------------+--------+
|iq |null |bigInt |true |false |false |
+-----+-----------+---------+-----------+-------------+--------+
Access the Underlying SparksContext
SparkSession.sparkContext returns the underlying sparkContext
, used for creating RDDs as well as managing cluster resources.
Spark.sparkContext
res17: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2debe9ac
Opinions expressed by DZone contributors are their own.
Comments