feature engineering in PySpark
We often need to do feature transformation to build a training data set before training a model.
Here are some good examples to show how to transform your data, especially if you need to derive new features from other columns using
spark data frame.
Encode and assemble multiple features in PySpark
Encode and assemble multiple features in PySpark
1 2 3 4 5 6 7 8 9 10 11 |
from pyspark.sql import Row from pyspark.ml.linalg import DenseVector row = Row("gender", "foo", "bar") df = sc.parallelize([ row("0", 3.0, DenseVector([0, 2.1, 1.0])), row("1", 1.0, DenseVector([0, 1.1, 1.0])), row("1", -1.0, DenseVector([0, 3.4, 0.0])), row("0", -3.0, DenseVector([0, 4.1, 0.0])) ]).toDF() |
First of all StringIndexer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df) indexed_df = indexer.transform(df) indexed_df.drop("bar").show() ## +------+----+--------------+ ## |gender| foo|gender_numeric| ## +------+----+--------------+ ## | 0| 3.0| 0.0| ## | 1| 1.0| 1.0| ## | 1|-1.0| 1.0| ## | 0|-3.0| 0.0| ## +------+----+--------------+ |
Next OneHotEncoder:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from pyspark.ml.feature import OneHotEncoder encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector") encoded_df = encoder.transform(indexed_df) encoded_df.drop("bar").show() ## +------+----+--------------+-------------+ ## |gender| foo|gender_numeric|gender_vector| ## +------+----+--------------+-------------+ ## | 0| 3.0| 0.0|(1,[0],[1.0])| ## | 1| 1.0| 1.0| (1,[],[])| ## | 1|-1.0| 1.0| (1,[],[])| ## | 0|-3.0| 0.0|(1,[0],[1.0])| ## +------+----+--------------+-------------+ |
VectorIndexer and VectorAssembler:
1 2 3 4 5 6 7 8 9 10 11 |
from pyspark.ml.feature import VectorIndexer, VectorAssembler vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed") assembler = VectorAssembler( inputCols=["gender_vector", "bar_indexed", "foo"], outputCol="features") encoded_df_with_indexed_bar = (vector_indexer .fit(encoded_df) .transform(encoded_df)) final_df = assembler.transform(encoded_df_with_indexed_bar) |
Finally you can wrap all of that using pipelines:
1 2 3 4 |
from pyspark.ml import Pipeline pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler]) model = pipeline.fit(df) transformed = model.transform(df) |
Arguably it is much robust and clean approach than writing everything from scratch. There are some caveats especially when you need consistent encoding between different datasets. You can read more in the official documentation for StringIndexer and VectorIndexer.
Regarding your questions:
make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose)
It is just an UDF like any other. Make sure you use supported types and beyond that everything should work just fine.
take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?
1 2 3 4 5 6 |
from pyspark.ml.linalg import VectorUDT from pyspark.sql.types import StructType, StructField schema = StructType([StructField("features", VectorUDT(), True)]) row = Row("features") result.map(lambda x: row(DenseVector(x))).toDF(schema) |
Note:
For Spark 1.x replace pyspark.ml.linalg with pyspark.mllib.linalg.
Adding column to PySpark DataFrame depending on whether column value is in another column
I have a PySpark DataFrame with structure given by
1 |
[('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])].toDF('user', 'item', 'fav_items') |
I need to add a further column with 1 or 0 depending on whether ‘item’ is in ‘fav_items’ or not.
So I would want
1 |
[('u1', 1, [1 ,2, 3], 1), ('u1', 4, [1, 2, 3], 0)] |
How would I look up for second column into third column to decide value and how would I then add it?
The following code does the requested task. An user defined function was defined that receives two columns of a DataFrame as parameters. So, for each row, search if an item is in the item list. If the item is found, a 1 is return, otherwise a 0.
1 2 3 4 5 6 7 8 9 10 |
# First we create a RDD in order to create a dataFrame: rdd = sc.parallelize([('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])]) df = rdd.toDF(['user', 'item', 'fav_items']) # Print dataFrame df.show() # We make an user define function that receives two columns and do operation function = udf(lambda item, items: 1 if item in items else 0, IntegerType()) df.select('user', 'item', 'fav_items', function(col('item'), col('fav_items')).alias('result')).show() |
Here the results:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
+----+----+---------+ |user|item|fav_items| +----+----+---------+ | u1| 1|[1, 2, 3]| | u1| 4|[1, 2, 3]| +----+----+---------+ +----+----+---------+------+ |user|item|fav_items|result| +----+----+---------+------+ | u1| 1|[1, 2, 3]| 1| | u1| 4|[1, 2, 3]| 0| +----+----+---------+------+ |
Pandas: create two new columns in a dataframe with values calculated from a pre-existing column
I’d just use zip:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
In [1]: from pandas import * In [2]: def calculate(x): ...: return x*2, x*3 ...: In [3]: df = DataFrame({'a': [1,2,3], 'b': [2,3,4]}) In [4]: df Out[4]: a b 0 1 2 1 2 3 2 3 4 In [5]: df["A1"], df["A2"] = zip(*df["a"].map(calculate)) In [6]: df Out[6]: a b A1 A2 0 1 2 2 3 1 2 3 4 6 2 3 4 6 9 |