Learn Spark by Examples

Tags: , , ,

In this post, I briefly introduce Spark, and uses examples to show how to use the popular RDD method to analyze your data. You can refer to this post to setup the pySpark environment using Ipython Notebook. 

SparkContext

SparkContext, or Spark context is the entry point to develop a spark application using the spark infrastructure.

Once a SparkContext object is created, it sets up the internal services and build a connection to the cluster managers, which manage the actual executors that conduct the specific computations.

The following diagram from the Spark documentation visualize the spark architecture:

The SparkContext object is usually referenced as the variable sc, It can be used create RDDs, accumulators and broadcast variables, and  access Spark services. 

What is RDD – Resilient Distributed Datasets (RDD)

The core data structure in Spark is RDD: resilient distributed dataset. In Spark, RDD represents of a dataset that is distributed across clusters. An RDD object can be thought as a collection of data, which can be tuples, strings, dictionaries, et al.

The data in RDD can be stored in RAM or in Disk if there is no space in the RAM. As the data can be stored in RAM, spark is especially efficient for iterative computing such as machine learning model training and Page Rank calculation.

Similar to DataFrames in Pandas, a dataset can be transfered into RDD, then you can run any of the methods associated with it.

Learning Spark through Examples 

Transfer your dataset into RDD

Suppose your data is stored in a file, can be a local file or a hdfs file. You can use sc.textFile(file_path) to create a RDD from the data.

In [1]:
rdd = sc.textFile("file:///homes/user/test.txt")
rdd.take(2)
Out[1]:
[u'0.1\t0.11', u'0.1\t0.1000001']
 

Use parallelize method to transfer a iterator into an RDD

In [3]:
ints = sc.parallelize(range(10))
In [4]:
ints.take(11)
Out[4]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
 

Use Rdd Map to generate a new Rdd from exist

In [5]:
squares = ints.map(lambda num: num * num)
squares.take(11)
Out[5]:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
 

Use RDD filter Method

In [6]:
even = ints.filter(lambda num: num % 2 == 0)
even.take(11)
Out[6]:
[0, 2, 4, 6, 8]
 

Use Reduce Method

In [7]:
sumRdd = ints.reduce(lambda a, b: a + b)
sumRdd
Out[7]:
45
 

Use flatMap Method

flatMap method is useful when you want to make an iterator into flattened elements. See the following example, initially, each line in the RDD is an array of words, after we call flatMap, we have a new RDD with words. This is useful to count the frequency of words:

In [8]:
docsRdd = sc.parallelize([('w1', 'w2', 'w3'), ('w2', 'w3'), ('w4', 'w5'), ('w2', 'w4')])
docsRdd.take(4)
Out[8]:
[('w1', 'w2', 'w3'), ('w2', 'w3'), ('w4', 'w5'), ('w2', 'w4')]
In [9]:
wordsRdd = docsRdd.flatMap(lambda line: line)
wordsRdd.take(10)
Out[9]:
['w1', 'w2', 'w3', 'w2', 'w3', 'w4', 'w5', 'w2', 'w4']
 

Use RDD reduceByKey method

To calculate the frequency of each word, we can use the reduceByKey method

In [10]:
freqRdd = wordsRdd.map(lambda word: (word, 1)).reduceByKey(lambda wfa, wfb: (wfa + wfb))
freqRdd.take(10)
Out[10]:
[('w5', 1), ('w3', 2), ('w1', 1), ('w4', 2), ('w2', 3)]
 

Sort the words list by frequency

In [11]:
sortedRdd = freqRdd.sortBy(lambda (k, v): v, False)
sortedRdd.take(10)
Out[11]:
[('w2', 3), ('w3', 2), ('w4', 2), ('w5', 1), ('w1', 1)]
 

Sort the words list alphabetically

In [13]:
sortAlpha = freqRdd.sortByKey()
sortAlpha.take(10)
Out[13]:
[('w1', 1), ('w2', 3), ('w3', 2), ('w4', 2), ('w5', 1)]

In next post, I will use describe other Spark RDDs methods.