Adding Multiple Columns to Spark DataFrames
I have been using spark’s dataframe API for quite sometime and often I would want to add many columns to a dataframe(for ex : Creating more features from existing features for a machine learning model) and find it hard to write many withColumn statements. So I monkey patched spark dataframe to make it easy to add multiple columns to spark dataframe.
First lets create a udf_wrapper decorator to keep the code concise
from pyspark.sql.functions import udf
return udf(func, returnType=returntype)
Lets create a spark dataframe with columns, user_id, app_usage (app and number of sessions of each app),
Spark Window Functions for DataFrames and SQL
Introduced in Spark 1.4, Spark window functions improved the expressiveness of Spark DataFrames and Spark SQL. With window functions, you can easily calculate a moving average or cumulative sum, or reference a value in a previous row of a table. Window functions allow you to do many common calculations with DataFrames, without having to resort to RDD manipulation.
Aggregates, UDFs vs. Window functions
Window functions are complementary to existing DataFrame operations: aggregates, such as sumand avg, and UDFs. To review,
When using spark, we often need to check whether a hdfs path exist before load the data, as if the path is not valid, we will get the following exception:
org.apache.hadoop.mapred.InvalidInputException: Input Pattern hdfs://…xxx matches 0 files
In this post, I describe two methods to check whether a hdfs path exist in pyspark.
The first solution is to try to load the data and put the code into a try block, we try to read the first element from the RDD. If the file does not exist, there will be Py4JJavaError. We catch the error and return False.
In this post, I first give a workable example to run pySpark on oozie. Then I show how to run pyspark on oozie using your own python installation (e.g., anaconda). In this way, you can use numpy, pandas, other python libraries in your pyspark program.
The syntax of creating a spark action on oozie workflow
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3">
<job-xml>[SPARK SETTINGS FILE]</job-xml>
<master>[SPARK MASTER URL]</master>
<name>[SPARK JOB NAME]</name>
<class>[SPARK MAIN CLASS]</class>
<jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar>
As described in the document, here are the meanings of these elements.
The prepare element, if present, indicates a list of paths to delete or create before starting the job. Specified paths must start with hdfs://HOST:PORT .
pyspark unit test
Pyspark is a powerful framework for large scale data analysis. Because of the easy-to-use API, you can easily develop pyspark programs if you are familiar with Python programming.
One problem is that it is a little hard to do unit test for pyspark. After some google search using “pyspark unit test”, I only get articles about using py.test or some other complicated libraries for pyspark unit test. However, I don’t want to install any other third party libraries . What I want is to set up the pyspark unit test environment just based on the unittest library,
In the previous post, we have already introduce Spark, RDD, and how to use RDD to do basic data analysis. In this post, I will show more examples on how to use the RDD method.
Spark RDD reduceByKey Method
We have used reduceByKey to solve the word frequency calculation problem. Here I will use a more complicated example to show how to use reduceByKey.
Suppose we have a set of tweets, each was shared by different users. We also give each user a weight denoting his importance.
Here is an example of the data set.
In this post, I briefly introduce Spark, and uses examples to show how to use the popular RDD method to analyze your data. You can refer to this post to setup the pySpark environment using Ipython Notebook.
SparkContext, or Spark context is the entry point to develop a spark application using the spark infrastructure.
Once a SparkContext object is created, it sets up the internal services and build a connection to the cluster managers, which manage the actual executors that conduct the specific computations.
The following diagram from the Spark documentation visualize the spark architecture:
The SparkContext object is usually referenced as the variable sc,