Introduction to Apache Spark's Core API (Part I)
We take a quick look at how to work with the functions and methods contained in Spark's core API using Python.
Join the DZone community and get the full member experience.
Join For FreeHello coders, I hope you are all doing well.
Over the past few months, I've been learning the Spark framework along with other big data topics. Spark is essentially a cluster programming framework. I know that one word can't define the entire framework.
In this post, I am going to discuss the core APIs of Apache Spark with respect to Python as a programming language. I am assuming that you have a basic knowledge of the Spark framework (tuples, RDD, pair RDD, and data frames) and its initialization steps.
When we launch a Spark shell, either in Scala or Python (i.e. Spark Shell or PySpark), it will initialize assparkContextsc
and asSQLContextsqlContext
.
- Core APIs
- sc.textFile(path)
- This method reads a text file from HDFS and returns it as an RDD of strings.
ordersRDD = sc.textFile('orders')
- rdd.first()
- This method returns the first element in the RDD.
ordersRDD.first() # u'1,2013-07-25 00:00:00.0,11599,CLOSED' - first element of the ordersRDD
- rdd.collect()
- This method returns a list that contains all of the elements in the RDD.
ordersRDD.collect() # [u'68882,2014-07-22 00:00:00.0,10000,ON_HOLD', u'68883,2014-07-23 00:00:00.0,5533,COMPLETE']
- rdd.filter(f)
- This method returns a new RDD containing only the elements that satisfy a predicate, i.e. it will create a new RDD containing those elements which satisfy the condition given in the argument.
filterdRDD = ordersRDD.filter(lambda line: line.split(',')[3] in ['COMPLETE']) filterdRDD.first() # u'3,2013-07-25 00:00:00.0,12111,COMPLETE'
- rdd.map(f)
- This method returns a new RDD by applying a function to each element of this RDD. i.e. it will transform the RDD to a new one by applying a function.
mapedRDD = ordersRDD.map(lambda line: tuple(line.split(','))) mapedRDD.first() # (u'1', u'2013-07-25 00:00:00.0', u'11599', u'CLOSED')
- rdd.flatMap(f)
- This method returns a new RDD by first applying a function to each element of this RDD (same as map method) and then flattening the results.
flatMapedRDD = ordersRDD.flatMap(lambda line: line.split(',')) flatMapedRDD.take(4) # [u'1', u'2013-07-25 00:00:00.0', u'11599', u'CLOSED']
- sc.parallelize(c)
- This method distributes a local Python collection to form an RDD.
lRDD = sc.parallelize(range(1,10)) lRDD.first() # 1 lRDD.take(10) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
- rdd.reduce(f)
- This method reduces the elements of this RDD using the specified commutative and associative binary operator.
lRDD.reduce(lambda x,y: x+y) # 45 - this is the sum of 1 to 9
- rdd.count()
- This method returns the number of elements in this RDD.
lRDD.count() # 9 - as there are 9 elements in the lRDD
- rdd.sortBy(keyFunc)
- This method sorts this RDD by a given
keyfunc
- This method sorts this RDD by a given
lRDD.collect() # [1, 2, 3, 4, 5, 6, 7, 8, 9] lRDD.sortBy(lambda x: -x).collect() # [9, 8, 7, 6, 5, 4, 3, 2, 1] - can sort the rdd in any manner i.e. ASC or DESC
- rdd.top(num)
- This method gets the top N elements from an RDD. It returns the list sorted in descending order.
lRDD.top(3) # [9, 8, 7]
- rdd.take(num)
- This method takes the first num elements of the RDD.
lRDD.take(7) # [1, 2, 3, 4, 5, 6, 7]
- rdd.union(otherRDD)
- Return the union of this RDD and another one.
l1 = sc.parallelize(range(1,5)) l1.collect() # [1, 2, 3, 4] l2 = sc.parallelize(range(3,8)) l2.collect() # [3, 4, 5, 6, 7] lUnion = l1.union(l2) lUnion.collect() # [1, 2, 3, 4, 3, 4, 5, 6, 7]
- rdd.distinct()
- Return a new RDD containing the distinct elements in this RDD.
lDistinct = lUnion.distinct() lDistinct.collect() # [2, 4, 6, 1, 3, 5, 7]
- rdd.intersection(otherRDD)
- Return the intersection of this RDD and another one, i.e. the output will not contain any duplicate elements, even if the input RDDs did.
lIntersection = l1.intersection(l2) lIntersection.collect() # [4, 3]
- rdd.subtract(otherRDD)
- Return each value in RDD that is not contained in another one.
lSubtract = l1.subtract(l2) lSubtract.collect() # [2, 1]
- rdd.saveAsTextFile(path, compressionCodec)
- Save this RDD as a text file.
lRDD.saveAsTextFile('lRDD_only') # this method will save the lRDD under lRDD_only folder under home directory in the HDFS lUnion.saveAsTextFile('lRDD_union','org.apache.hadoop.io.compress.GzipCodec') # this method will save the lUion compressed with Gzip codec under lRDD_union folder under home directory in the HDFS lSubtract.saveAsTextFile('lRDD_union','org.apache.hadoop.io.compress.SnappyCodec') # this method will save the lUion compressed with Snappy codec under lRDD_union folder under home directory in the HDFS
- rdd.keyBy(f)
- Creates tuples (pair RDD) of the elements in this RDD by applying the function.
ordersPairRDD = ordersRDD.keyBy(lambda line: int(line.split(',')[0])) ordersPairRDD.first() # (1, u'1,2013-07-25 00:00:00.0,11599,CLOSED') # This way we can create the pair RDD.
- sc.textFile(path)
For now, these are all the functions or methods for plain RDD, i.e. without the key. In my next post, I will explain the functions or methods with respect to pair RDD with multiple example snippets.
Thanks for reading and happy coding!
Opinions expressed by DZone contributors are their own.
Comments