Spark MLlib Example

Tags: , , ,
 

In this post, I will use an example to describe how to use pyspark, and show how to train a Support Vector Machine, and use the model to make predications using Spark MLlib.

The following Program is developed using Ipython Notebook. Please refer to this article for how to set up in Ipython Notebook Server for PySpark, if you want to set up an ipython notebook server. You can also run the program use other python IDEs such Spyder or Pycharm. 

A simple example to demonstrate how to use sc, the default object of SparkContext.

In [1]:
sc.parallelize([1,2,3,4]).collect()
Out[1]:
[1, 2, 3, 4]
In [2]:
rdd = sc.parallelize(["a b c d", "c d e f g", "a f e g k"])
 

An Example of RDD Map Method

We can use the map method to transfer each line from a string into an array.

In [3]:

rddAry = rdd.map(lambda line: line.split())
In [4]:
rddAry.collect()
Out[4]:
[['a', 'b', 'c', 'd'], ['c', 'd', 'e', 'f', 'g'], ['a', 'f', 'e', 'g', 'k']]
 

An Example of RDD FlatMap Method

As you can see, flatMap transfer a nested sequences int a flat sequence.

In [5]:
rdd.flatMap(lambda x:x.split()).collect()
Out[5]:
['a', 'b', 'c', 'd', 'c', 'd', 'e', 'f', 'g', 'a', 'f', 'e', 'g', 'k']
In [6]:
words = rddAry.flatMap(lambda x:x)
 

Distinct and Count method
In [7]:

words.distinct().count()
Out[7]:
8
 

Use Spark to Count word Frequency

In [ ]:
wordsFreq = words.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)
In [9]:
wordsFreq.collect()
Out[9]:
[('a', 2),
 ('c', 2),
 ('e', 2),
 ('k', 1),
 ('g', 2),
 ('b', 1),
 ('d', 2),
 ('f', 2)]
In [10]:
freqLg1 = wordsFreq.filter(lambda (w, freq): freq > 1)
In [11]:
freqLg1.collect()
Out[11]:
[('a', 2), ('c', 2), ('e', 2), ('g', 2), ('d', 2), ('f', 2)]
 

An Example of using numpy in Spark

In [12]:
import numpy
In [13]:
ary = numpy.array([1,2,3,4])
In [14]:
ary.sum()
Out[14]:
10
In [15]:
def arySum(input):
    import numpy
    res = numpy.array(input).astype(int).sum()
    return int(res)
 

In this example, we use numpy to do the data transformation. The arySum method is implemented based on numpy.

In [16]:
data = ["1 2 3 4 5", "2 3 4 5 6 7"]
rdd = sc.parallelize(data).map(lambda x: x.split()).map(lambda x: arySum(x))
In [17]:
rdd.collect()
Out[17]:
[15, 27]
In [18]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    try:
        res = LabeledPoint(values[0], values[1:])
    except Exception, e:
        raise str(e) + str(values)
    return res
In [19]:
svmData = """
1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0
0 2.857738033247042 0 0 2.619965104088255 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0
0 2.857738033247042 0 2.061393766919624 0 0 2.004684436494304 0 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0
1 0 0 2.061393766919624 2.619965104088255 0 2.004684436494304 2.000347299268466 0 0 0 0 2.055002875864414 0 0 0 0
1 2.857738033247042 0 2.061393766919624 2.619965104088255 0 2.004684436494304 0 0 0 0 0 2.055002875864414 0 0 0 0
0 2.857738033247042 0 2.061393766919624 2.619965104088255 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0
1 0 0 0 2.619965104088255 0 2.004684436494304 0 0 2.228387042742021 2.228387042742023 0 2.055002875864414 0 0 0 0
1 0 0 0 2.619965104088255 0 2.004684436494304 0 0 2.228387042742021 2.228387042742023 0 2.055002875864414 0 0 0 0
0 2.857738033247042 0 2.061393766919624 2.619965104088255 0 2.004684436494304 2.000347299268466 2.122974378789621 2.228387042742021 2.228387042742023 0 0 0 0 12.72816758217773 0
0 2.857738033247042 0 0 2.619965104088255 0 0 0 0 2.228387042742021 2.228387042742023 0 2.055002875864414 0 0 0 0
1 2.857738033247042 0 0 2.619965104088255 0 0 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0
1 2.857738033247042 0 0 2.619965104088255 0 2.004684436494304 2.000347299268466 2.122974378789621 0 0 0 0 0 0 0 0
1 0 0 0 0 4.745052855503306 2.004684436494304 0 2.122974378789621 2.228387042742021 2.228387042742023 0 0 0 0 0 0
"""
In [20]:
svmData.strip().split('\n')[0]
Out[20]:
'1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0'
In [21]:
data = sc.parallelize(svmData.strip().split('\n'))
In [22]:
data.take(1)
Out[22]:
['1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0']
In [23]:
parsedData = data.map(parsePoint)
In [24]:
parsedData.take(1)
Out[24]:
[LabeledPoint(1.0, [0.0,2.52078447202,0.0,0.0,0.0,2.00468443649,2.00034729927,0.0,2.22838704274,2.22838704274,0.0,0.0,0.0,0.0,0.0,0.0])]
 

An Spark MLlib Example

This example shows how to train a Support Vector Machine using Spark MLlib. We first read data in libsvm format. Then load the data and parse the data into vectors.

The parsePoint method transfer each line into an object of LabledPoint

In [25]:
# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
# Save and load model
model.save(sc, "myModelPath")
sameModel = SVMModel.load(sc, "myModelPath")
 
Training Error = 0.0769230769231