Pyspark broadcast variable Example

Tags: , , ,

Pyspark broadcast variable

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. The concept of Broadcast variables is simular to Hadoop’s distributed cache.

When to use broadcast variable

The best case to use Broadcast variable is when you want to join two tables and one of them is small. By using Broadcast variable, we can implement a map-side join, which is much faster than reduce side join, as there is no shuffle, which is expensive. 

Suppose we have the following Rdd, and we want to make join with another Rdd. To implement a map-side join, we need to define a broadcast variable for the small data set. 

In [2]:
a = [(1, (1, 2)), (2, (2, 3)), (3, (3, 4))]  # suppose this is the large data set. 
rdd_a = sc.parallelize(a)
rdd_a.take(1)
Out[2]:
[(1, (1, 2))]
 

How to define a broadcast variable

The following code shows how to define a broadcast variable from a local variable.  We can using the .value attribute to get the target data.

In [3]:
b = [(1,(1, 4)), (2, (2, 5)), (3, (4,5))]
rdd_b = sc.broadcast(b)
rdd_b.value[0]
Out[3]:
(1, (1, 4))
 

How to defined a broadcast map or HashTable

The following code shows how to define a broadcast HashTable or Dictionary. Then we can get access the data by key. 

In [4]:
bmap = sc.parallelize(b).collectAsMap()
bmap[1] # the key is 1
Out[4]:
(1, 4)
In [5]:
bmap_broad = sc.broadcast(bmap)
bmap_broad.value[2]
Out[5]:
(2, 5)
In [6]:
bmap_broad.value[3]
Out[6]:
(4, 5)
In [7]:
bmap_broad.value[4] # the key does not exist
 
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-7-0777922f72fa> in <module>()
----> 1 bmap_broad.value[4]
KeyError: 4
In [8]:
4 in bmap_broad.value  # use this test to avoid the KeyError Exception
Out[8]:
False
In [9]:
3 in bmap_broad.value
Out[9]:
True
 

An example to use pyspark broadcast variable for map-side join

The following implementation shows how to conduct a map-side join using pyspark broadcast variable.

We can simply use the broadcast_rdd.value[key] or broadcast_rdd.get(key, 'default') to get value from the broadcasted HashTable.

In [10]:
rdd_a.take(4)
Out[10]:
[(1, (1, 2)), (2, (2, 3)), (3, (3, 4))]
In [11]:
bmap_broad.value
Out[11]:
{1: (1, 4), 2: (2, 5), 3: (4, 5)}
In [12]:
join_a_b = rdd_a.map(lambda (key, value): (key, bmap_broad.value.get(key, '-'), value))
In [13]:
join_a_b.take(5)
Out[13]:
[(1, (1, 4), (1, 2)), (2, (2, 5), (2, 3)), (3, (4, 5), (3, 4))]