We often need to do feature transformation to build a training data set before training a model.
Here are some good examples to show how to transform your data, especially if you need to derive new features from other columns using
spark data frame.
Encode and assemble multiple features in PySpark
Encode and assemble multiple features in PySpark
|
from pyspark.sql import Row from pyspark.ml.linalg import DenseVector row = Row("gender", "foo", "bar") df = sc.parallelize([ row("0", 3.0, DenseVector([0, 2.1, 1.0])), row("1", 1.0, DenseVector([0, 1.1, 1.0])), row("1", -1.0, DenseVector([0, 3.4, 0.0])), row("0", -3.0, DenseVector([0, 4.1, 0.0])) ]).toDF() |
First of all StringIndexer.
|
from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df) indexed_df = indexer.transform(df) indexed_df.drop("bar").show() ## +------+----+--------------+ ## |gender| foo|gender_numeric| ## +------+----+--------------+ ## | 0| 3.0| 0.0| ## | 1| 1.0| 1.0| ## | 1|-1.0| 1.0| ## | 0|-3.0| 0.0| ## +------+----+--------------+ |
Next OneHotEncoder:
|
from pyspark.ml.feature import OneHotEncoder encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector") encoded_df = encoder.transform(indexed_df) encoded_df.drop("bar").show() ## +------+----+--------------+-------------+ ## |gender| foo|gender_numeric|gender_vector| ## +------+----+--------------+-------------+ ## | 0| 3.0| 0.0|(1,[0],[1.0])| ## | 1| 1.0| 1.0| (1,[],[])| ## | 1|-1.0| 1.0| (1,[],[])| ## | 0|-3.0| 0.0|(1,[0],[1.0])| ## +------+----+--------------+-------------+ |
VectorIndexer and VectorAssembler:
|
from pyspark.ml.feature import VectorIndexer, VectorAssembler vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed") assembler = VectorAssembler( inputCols=["gender_vector", "bar_indexed", "foo"], outputCol="features") encoded_df_with_indexed_bar = (vector_indexer .fit(encoded_df) .transform(encoded_df)) final_df = assembler.transform(encoded_df_with_indexed_bar) |
Finally you can wrap all of that using pipelines:
|
from pyspark.ml import Pipeline pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler]) model = pipeline.fit(df) transformed = model.transform(df) |
Arguably it is much robust and clean approach than writing everything from scratch.
[Read More...]