Spark: Solve Task not serializable Exception
One of the most frequently occurred exceptions when you use Spark is the Task not serializable exception
:
org.apache.spark.SparkException: Task not serializable
This exception happens when you create an Non-Serializable Object on the Driver and try to use it on the the reducer.
Here is an example to produce such an exception:
Suppose we have a non serializable class named MyTokenlizer:
1 2 3 4 5 |
class MyTokenlizer { def tokens(input:String): Array[String] ={ input.split(",") } } |
You submit a spark job like this:
1 2 3 4 |
val obj = new MyTokenlizer() val rdd = sc.parallelize(Array("helll, word", "hi, hii")); rdd.map(line => (line, obj.tokens(line))).count() |
Now you will get the org.apache.spark.SparkException: Task not serializable exception.
To solve this Exception, you can make the class serializable by implement the serializable interface:
1 2 3 4 5 |
class NewTokenlizer extends Serializable{ def tokens(input:String): Array[String] ={ input.split(",") } } |
1 2 |
val newT = new NewTokenlizer() rdd.map(line => (line, newT.tokens(line))).count() |
Now the program runs perfectly.
What if the class is a third library?
If you use a third library, you will have no control of the class. In this case you can create the object inside the Map function like this:
1 2 3 4 |
rdd.map{ // val obj = new MyTokenlizer(), This does not work line => {val obj = new MyTokenlizer(); (line, obj.tokens(line))} }.count() |
This is not efficient, as it will create the object for each line of the data in RDD. We can use the mapPartition method:
1 2 3 4 5 6 7 |
rdd.mapPartitions{ // val obj = new MyTokenlizer(), This does not work data => data.map{ val obj = new MyTokenlizer() line => (line, obj.tokens(line)) } }.count() |
In this case, the object will be created for each partition of the dataset.