run pyspark on oozie
In this post, I first give a workable example to run pySpark on oozie. Then I show how to run pyspark on oozie using your own python installation (e.g., anaconda). In this way, you can use numpy, pandas, other python libraries in your pyspark program.
The syntax of creating a spark action on oozie workflow
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.3"> ... <action name="[NODE-NAME]"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>[JOB-TRACKER]</job-tracker> <name-node>[NAME-NODE]</name-node> <prepare> <delete path="[PATH]"/> ... <mkdir path="[PATH]"/> ... </prepare> <job-xml>[SPARK SETTINGS FILE]</job-xml> <configuration> <property> <name>[PROPERTY-NAME]</name> <value>[PROPERTY-VALUE]</value> </property> ... </configuration> <master>[SPARK MASTER URL]</master> <mode>[SPARK MODE]</mode> <name>[SPARK JOB NAME]</name> <class>[SPARK MAIN CLASS]</class> <jar>[SPARK DEPENDENCIES JAR / PYTHON FILE]</jar> <spark-opts>[SPARK-OPTIONS]</spark-opts> <arg>[ARG-VALUE]</arg> ... <arg>[ARG-VALUE]</arg> ... </spark> <ok to="[NODE-NAME]"/> <error to="[NODE-NAME]"/> </action> ... </workflow-app> |
As described in the document, here are the meanings of these elements.
The prepare
element, if present, indicates a list of paths to delete or create before starting the job. Specified paths must start with hdfs://HOST:PORT .
The job-xml
element, if present, specifies a file containing configuration for the Spark job. Multiple job-xml elements are allowed in order to specify multiple job.xml files.
The configuration
element, if present, contains configuration properties that are passed to the Spark job.
The master
element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master, or local.
The mode
element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster.
The name
element indicates the name of the spark application.
The class
element if present, indicates the spark’s application main class.
The jar
element indicates a comma separated list of jars or python files.
The spark-opts
element if present, contains a list of spark options that can be passed to spark driver.
Spark configuration options can be passed by specifying ‘–conf key=value’ here, or fromoozie.service.SparkConfigurationService.spark.configurations in oozie-site.xml. The spark-opts configs have priority.
The arg
element if present, contains arguments that can be passed to spark application.
Please note, there is no <file> element.
See the following example, in the <jar> tag, we put the hdfs path of the python script.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1"> ... <action name="spark_start"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${job_tracker}</job-tracker> <name-node>${name_node}</name-node> <prepare> <delete path="${output}"/> </prepare> <configuration> <property> <name>mapred.compress.map.output</name> <value>true</value> </property> </configuration> <master>yarn</master> <mode>cluster<mode> <name>My_Spark_Example</name> <class></class> <jar>hdfs://hostname:port/.../bin/my_python.py</jar> <spark-opts>--executor-memory 20G --num-executors 50 --archives hdfs://xxx:8020/user/xx/anaconda2.zip#anaconda_remote </spark-opts> <arg>${input_data_path}</arg> <arg>${output_path}</arg> </spark> <ok to="myotherjob"/> <error to="errorcleanup"/> </action> ... </workflow-app> |
The following is the python file: my_python.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from pyspark import SparkContext, SparkConf import sys input = sys.argv[1] out = sys.argv[2] conf = SparkConf().setAppName('test_pyspark_oozie') sc = SparkContext(conf=conf) rdd = sc.textFile(input) rdd.saveAsTextFile(out) |
To make the above program workable, we also need to put the following two files: py4j-0.9-src.zip pyspark.zip
, under your …/apps/lib/ folder.
These two files are located here: $SPARK_HOME/python/lib/
ls $SPARK_HOME/python/lib/
py4j-0.9-src.zip pyspark.zip
Otherwise, you will get the following exception:
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Missing py4j and/or pyspark zip files. Please add them to the lib folder or to the Spark sharelib. org.apache.oozie.action.hadoop.OozieActionConfiguratorException: Missing py4j and/or pyspark zip files. Please add them to the lib folder or to the Spark sharelib.
Run pyspark on oozie using your own python
Sometimes you have no control of the grid, an alternative method is to upload your own python installation or virtual python env to the grid using spark archieve options.
In this section, I describe how to run pyspark using your own anaconda installation.
Download and install anaconda
bash Anaconda-2.2.0-Linux-x86_64.sh (point the installation to ~/anaconda) export PATH=~/anaconda/bin:$PATHcd ~/anaconda zip -r anaconda.zip . mv anaconda.zip ~/ (moving the zip back to home directory) hdfs dfs -put ~/anaconda.zip to hdfs://xxx:8020/tmp/xx/anaconda.zip
When we submit spark from gateway, we need to setup the PYSPARK_PYTHON variable.
Set PYSPARK_PYTHON
export PYSPARK_PYTHON=./anaconda/bin/python
We also need to set the PYSPARK_PYTHON env variable on the executor nodes., put this config in your spark-submit command.:
–conf spark.executorEnv.PYSPARK_PYTHON=./anaconda/bin/python
If we are running on cluster mode we also have to export PYSPARK_PYTHON on the application master or driver:
–conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda/bin/python
Now we can set up the pyspark oozie environment using anaconda.
The following code is an workable example to run pySpark on oozie using our own anaconda python environment.
Since we run spark in yarn-cluster mode, we need to set both spark.executorEnv.PYSPARK_PYTHON and spark.appMasterEnv.PYSPARK_PYTHON.
–conf spark.executorEnv.PYSPARK_PYTHON=./anaconda_remote/bin/python
–conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda_remote/bin/python
The following is the workflow.xml.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
<start to="start_spark" /> <action name="start_spark"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="/tmp/username/spark_oozie_test_out3"/> </prepare> <master>yarn-cluster</master> <mode>cluster</mode> <name>${spark_name}</name> <class>clear</class> <jar>${dir}/test_oozie_pyspark.py</jar> <spark-opts>--queue default --conf spark.ui.view.acls=* --executor-memory 2G --num-executors 2 --executor-cores 2 --driver-memory 3g --conf spark.executorEnv.PYSPARK_PYTHON=./anaconda_remote/bin/python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda_remote/bin/python --archives hdfs://xxxx:8020/tmp/xxx/anaconda.zip#anaconda_remote </spark-opts> <arg>/tmp/xxx/spark_oozie_test</arg> <arg>/tmp/xxx/spark_oozie_test_out3</arg> <!--<file>${dir}/test_oozie_pyspark.py#test_oozie_pyspark.py</file>--> </spark> <ok to="end"/> <error to="fail"/> </action> |
The file of test_oozie_pyspark.py is:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
from pyspark import SparkContext, SparkConf import numpy # to test whether we can use numpy import sys input = sys.argv[1] out = sys.argv[2] conf = SparkConf().setAppName('test_pyspark_oozie') sc = SparkContext(conf=conf) rdd = sc.textFile(input) x = numpy.sum([1,2,3]) rdd = rdd.map(lambda line: line + str(x)) rdd.saveAsTextFile(out) |
In this post, I have shown how to run pyspark on oozie using your own python installation (e.g., anaconda). So, you can use differenet libraries such as numpy, pandas, other python libraries in your pyspark program, even if they are not installed on the grid.
Reference:
https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html