Learn spark by examples (2)

Tags: , ,
 

In the previous post, we have already introduce Spark, RDD, and how to use RDD to do basic data analysis. In this post, I will show more examples on how to use the RDD method.

Spark RDD reduceByKey Method

We have used reduceByKey to solve the word frequency calculation problem. Here I will use a more complicated example to show how to use reduceByKey.

Suppose we have a set of tweets, each was shared by different users. We also give each user a weight denoting his importance.

Here is an example of the data set.

dataset = """
t1,2, u1, 0.8 \n
t1,3, u2, 0.1 \n
t1,4, u3, 0.3 \n
t1,5, u4, 0.4 \n
t2,3, u1, 0.8 \n
t2,10, u2, 0.3
"""

The first line: t1,2, u1, 0.8 means tweets t1 is shared 2 times by user u1, whose weight is 0.8.  Given such kinds of data, we can calculate the tweet’s average sharing times per user to represent the importance of the tweet.

Without using user’s weight, the average sharing time for t1 is: (2 + 3 + 4 + 5) / 4 = 14 / 4 = 3.5.

If we consider the user’s weight, the average sharing time for t1 will be: ( 2 * 0.8 + 3 * 0.1 + 4 * 0.3 + 5 * 0.4 ) / (0.8 + 0.1 + 0.3 + 0.4) = 3.187

The reason that the weighted average sharing time is smaller is because the user u1, with a  much higher weight than other users,  only shares t1 2 times.

If we use the average sharing time to represent a tweet’s popularity, the weighted average method is more reasonable as an important user can make it be reachable to more people.

Here I will use reduceByKey to calculate both the unweighted and weighted average sharing time.

In [1]:
dataset = """
t1,2, u1, 0.8 \n
t1,3, u2, 0.1 \n
t1,4, u3, 0.3 \n
t1,5, u4, 0.4 \n
t2,3, u1, 0.8 \n
t2,10, u2, 0.3
"""
data = sc.parallelize(dataset.strip().split("\n")).filter(lambda line: line)
data = data.map(lambda line: line.strip().split(','))
data.take(6)
Out[1]:
[['t1', '2', ' u1', ' 0.8'],
 ['t1', '3', ' u2', ' 0.1'],
 ['t1', '4', ' u3', ' 0.3'],
 ['t1', '5', ' u4', ' 0.4'],
 ['t2', '3', ' u1', ' 0.8'],
 ['t2', '10', ' u2', ' 0.3']]
In [2]:
unweighted = data.map(lambda (t, freq, u, w): (t, (int(freq), 1))).\
             reduceByKey(lambda (f1, n1), (f2, n2): (f1 + f2, n1 + n2))
unweighted.take(2)
Out[2]:
[('t2', (13, 2)), ('t1', (14, 4))]
In [3]:
unweightedRes = unweighted.map(lambda (t, (totalFreq, totalUser)): (t, totalFreq * 1.0 / totalUser))
unweightedRes.take(2)
Out[3]:
[('t2', 6.5), ('t1', 3.5)]
In [4]:
weighted = data.map(lambda (t, freq, u, w): (t, (int(freq), float(w)))).\
         reduceByKey(lambda (f1, w1), (f2, w2): (f1 * w1 + f2 * w2, w1 + w2))
weighted.take(2)  
Out[4]:
[('t2', (5.4, 1.1)), ('t1', (5.492, 1.6))]
In [5]:
weightedRes = weighted.map(lambda (t, (totalShare, totalWeight)): (t, totalShare / totalWeight))
weightedRes.take(2)
Out[5]:
[('t2', 4.909090909090909), ('t1', 3.4324999999999997)]
 

Spark RDD map() vs. mapValues()

If you don’t touch or change the keys of your RDD, you should use mapValues, especially when you need to retain the original RDD’s partition for performance concern.

See the following description from Spark documentation:

mapValues: Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.

map(self, f, preservesPartitioning=False) Return a new RDD by applying a function to each element of this RDD.

As map does not preservePartionting in default. So after we run reduceByKey, if we only want to transer the value part of the data, we should use mapValues.

In [6]:
weightedRes2 = weighted.mapValues(lambda (totalShare, totalWeight): (totalShare, totalWeight, totalShare / totalWeight))
weightedRes2.take(2)
Out[6]:
[('t2', (5.4, 1.1, 4.909090909090909)),
 ('t1', (5.492, 1.6, 3.4324999999999997))]
 

Get the max value from an RDD

Suppose the RDD contains only a sequence numbers, it’s straigford to get the maximum value by calling its max method.

In [7]:
rdd = sc.parallelize([1, 3, 4, 5, 6, 7, 9])
rdd.max()
Out[7]:
9
 

However, it’s a little tricky if the elment in the RDD are pairs. For example, suppose we have a data set with format RDD<Name, Age, income>, we can use the following code to find the element with maximum age. In the lambda expression, we specify the value for comparison.

In [8]:
rdd2 = sc.parallelize([('Jim', 30, 1000), ('Tom', 25, 3000), ('David', 28, 2500)])
rdd2.max(lambda (name, age, income): age)
Out[8]:
('Jim', 30, 1000)
In [9]:
# this will get the person with largest income
rdd2.max(lambda (name, age, income): income) 
Out[9]:
('Tom', 25, 3000)
 

Get the min value from an RDD

In [10]:
rdd2.min(lambda (name, age, income): age)
Out[10]:
('Tom', 25, 3000)
 

Spark count word Frequency

Spark RDD countByValue Method

Spark RDD also provides countByValue method, which can be easily used to count word frequency for a document.

In [11]:
docRdd = sc.parallelize("this is the content of a document this document is just an example".split())
docRdd.countByValue()
Out[11]:
defaultdict(int,
            {'a': 1,
             'an': 1,
             'content': 1,
             'document': 2,
             'example': 1,
             'is': 2,
             'just': 1,
             'of': 1,
             'the': 1,
             'this': 2})
 Be Careful

Since this method returns a defaultdict, if you have a large number of words, you driver may not have enough memeory to hold all of them. In this case, you should use the reduceByKey method. The following code will store the word frequency into an RDD, which can be safely to stored into HDFS file system.

In [12]:
freqRdd = docRdd.map(lambda word: (word, 1)).reduceByKey(lambda wf1, wf2: wf1 + wf2)
#freqRdd.saveAsTextFile('hdfs file')
freqRdd.take(4)
Out[12]:
[('a', 1), ('just', 1), ('this', 2), ('is', 2)]