# Basics of Map Reduce Algorithm Explained with a Simple Example

While processing large set of data, we should definitely address scalability and efficiency in the application code that is processing the large amount of data.

Map reduce algorithm (or flow) is highly effective in handling big data.

Let us take a simple example and use map reduce to solve a problem.

Say you are processing a large amount of data and trying to find out what percentage of your user base where talking about games.

First, we will identify the keywords which we are going to map from the data to conclude that its something related to games.

Next, we will write a mapping function to identify such patterns in our data. For example, the keywords can be Gold medals, Bronze medals, Silver medals, Olympic football, basketball, cricket, etc.

Let us take the following chunks in a big data set and see how to process it.

“Hi, how are you”
“We love football”
“He is an awesome football player”
“Merry Christmas”
“Olympics will be held in China”
“Records broken today in Olympics”
“Yes, we won 2 Gold medals”
“He qualified for Olympics”

### Mapping Phase

So our map phase of our algorithm will be as follows:

1. Declare a function “Map”
2. Loop: For each words equal to “football”
3. Increment counter
4. Return key value “football”=>counter

In the same way, we can define n number of mapping functions for mapping various words words: “Olympics”, “Gold Medals”, “cricket”, etc.

### Reducing Phase

The reducing function will accept the input from all these mappers in form of key value pair and then processing it. So, input to the reduce function will look like the following:

• reduce(“football”=>2)
• reduce(“Olympics”=>3)

Our algorithm will continue with the following steps:

5. Declare a function reduce to accept the values from map function.
6. Where for each key-value pair, add value to counter.
7. Return “games”=> counter.

At the end, we will get the output like “games”=>5.

Now, getting into a big picture we can write n number of mapper functions here. Let us say that you want to know who all where wishing each other. In this case you will write a mapping function to map the words like “Wishing”, “Wish”, “Happy”, “Merry” and then will write a corresponding reducer function.

Here you will need one function for shuffling which will distinguish between the “games” and “wishing” keys returned by mappers and will send it to the respective reducer function.

Similarly you may need a function for splitting initially to give inputs to the mapper functions in form of chunks.

### Flow of Map Reduce Algorithm

To following diagram summarizes the the flow of Map reduce algorithm: In the above map reduce flow:

1. The input data can be divided into n number of chunks depending upon the amount of data and processing capacity of individual unit.
2. Next, it is passed to the mapper functions. Please note that all the chunks are processed simultaneously at the same time, which embraces the parallel processing of data.
3. After that, shuffling happens which leads to aggregation of similar patterns.
4. Finally, reducers combine them all to get a consolidated output as per the logic.
5. This algorithm embraces scalability as depending on the size of the input data, we can keep increasing the number of the parallel processing units.
Map reduce example: Use Mapreduce to Find Friends

MapReduce is a framework originally developed at Google that allows for easy large scale distributed computing across a number of domains. Apache Hadoop is an open source implementation.

I’ll gloss over the details, but it comes down to defining two functions: a map function and a reduce function. The map function takes a value and outputs key:value pairs. For instance, if we define a map function that takes a string and outputs the length of the word as the key and the word itself as the value then map(steve) would return 5:steve and map(savannah) would return 8:savannah. You may have noticed that the map function is stateless and only requires the input value to compute it’s output value. This allows us to run the map function against values in parallel and provides a huge advantage. Before we get to the reduce function, the mapreduce framework groups all of the values together by key, so if the map functions output the following key:value pairs:

3 : the
3 : and
3 : you
4 : then
4 : what
4 : when
5 : steve
5 : where
8 : savannah
8 : research

They get grouped as:

3 : [the, and, you]
4 : [then, what, when]
5 : [steve, where]
8 : [savannah, research]

Each of these lines would then be passed as an argument to the reduce function, which accepts a key and a list of values. In this instance, we might be trying to figure out how many words of certain lengths exist, so our reduce function will just count the number of items in the list and output the key with the size of the list, like:

3 : 3
4 : 3
5 : 2
8 : 2

The reductions can also be done in parallel, again providing a huge advantage. We can then look at these final results and see that there were only two words of length 5 in our corpus, etc…

The most common example of mapreduce is for counting the number of times words occur in a corpus. Suppose you had a copy of the internet (I’ve been fortunate enough to have worked in such a situation), and you wanted a list of every word on the internet as well as how many times it occurred.

The way you would approach this would be to tokenize the documents you have (break it into words), and pass each word to a mapper. The mapper would then spit the word back out along with a value of 1. The grouping phase will take all the keys (in this case words), and make a list of 1’s. The reduce phase then takes a key (the word) and a list (a list of 1’s for every time the key appeared on the internet), and sums the list. The reducer then outputs the word, along with it’s count. When all is said and done you’ll have a list of every word on the internet, along with how many times it appeared.

Easy, right? If you’ve ever read about mapreduce, the above scenario isn’t anything new… it’s the “Hello, World” of mapreduce. So here is a real world use case (Facebook may or may not actually do the following, it’s just an example):

Facebook has a list of friends (note that friends are a bi-directional thing on Facebook. If I’m your friend, you’re mine). They also have lots of disk space and they serve hundreds of millions of requests everyday. They’ve decided to pre-compute calculations when they can to reduce the processing time of requests. One common processing request is the “You and Joe have 230 friends in common” feature. When you visit someone’s profile, you see a list of friends that you have in common. This list doesn’t change frequently so it’d be wasteful to recalculate it every time you visited the profile (sure you could use a decent caching strategy, but then I wouldn’t be able to continue writing about mapreduce for this problem). We’re going to use mapreduce so that we can calculate everyone’s common friends once a day and store those results. Later on it’s just a quick lookup. We’ve got lots of disk, it’s cheap.

Assume the friends are stored as Person->[List of Friends], our friends list is then:

A -> B C D
B -> A C D E
C -> A B D E
D -> A B C E
E -> B C D

Each line will be an argument to a mapper. For every friend in the list of friends, the mapper will output a key-value pair. The key will be a friend along with the person. The value will be the list of friends. The key will be sorted so that the friends are in order, causing all pairs of friends to go to the same reducer. This is hard to explain with text, so let’s just do it and see if you can see the pattern. After all the mappers are done running, you’ll have a list like this:

For map(A -> B C D) :

(A B) -> B C D
(A C) -> B C D
(A D) -> B C D

For map(B -> A C D E) : (Note that A comes before B in the key)

(A B) -> A C D E
(B C) -> A C D E
(B D) -> A C D E
(B E) -> A C D E

For map(C -> A B D E) :

(A C) -> A B D E
(B C) -> A B D E
(C D) -> A B D E
(C E) -> A B D E

For map(D -> A B C E) :

(A D) -> A B C E
(B D) -> A B C E
(C D) -> A B C E
(D E) -> A B C E

And finally for map(E -> B C D):

(B E) -> B C D
(C E) -> B C D
(D E) -> B C D

Before we send these key-value pairs to the reducers, we group them by their keys and get:

(A B) -> (A C D E) (B C D)
(A C) -> (A B D E) (B C D)
(A D) -> (A B C E) (B C D)
(B C) -> (A B D E) (A C D E)
(B D) -> (A B C E) (A C D E)
(B E) -> (A C D E) (B C D)
(C D) -> (A B C E) (A B D E)
(C E) -> (A B D E) (B C D)
(D E) -> (A B C E) (B C D)

Each line will be passed as an argument to a reducer. The reduce function will simply intersect the lists of values and output the same key with the result of the intersection. For example, reduce((A B) -> (A C D E) (B C D)) will output (A B) : (C D) and means that friends A and B have C and D as common friends.

The result after reduction is:

(A B) -> (C D)
(A C) -> (B D)
(A D) -> (B C)
(B C) -> (A D E)
(B D) -> (A C E)
(B E) -> (C D)
(C D) -> (A B E)
(C E) -> (B D)
(D E) -> (B C)

Now when D visits B’s profile, we can quickly look up (B D) and see that they have three friends in common, (A C E).

# MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES

In this article I digested a number of MapReduce patterns and algorithms to give a systematic view of the different techniques that can be found on the web or scientific articles. Several practical case studies are also provided. All descriptions and code snippets use the standard Hadoop’s MapReduce model with Mappers, Reduces, Combiners, Partitioners, and sorting. This framework is depicted in the figure below.

# Basic MapReduce Patterns

## Counting and Summing

Problem Statement: There is a number of documents where each document is a set of terms. It is required to calculate a total number of occurrences of each term in all documents. Alternatively, it can be an arbitrary function of the terms. For instance, there is a log file where each record contains a response time and it is required to calculate an average response time.

Solution:

Let start with something really simple. The code snippet below shows Mapper that simply emit “1” for each term it processes and Reducer that goes through the lists of ones and sum them up:

 1 2 3 4 5 6 7 8 9 10 11 `class` `Mapper` `   ``method Map(docid id, doc d)` `      ``for` `all term t in doc d ``do` `         ``Emit(term t, count 1)`   `class` `Reducer` `   ``method Reduce(term t, counts [c1, c2,...])` `      ``sum = 0` `      ``for` `all count c in [c1, c2,...] ``do` `          ``sum = sum + c` `      ``Emit(term t, count sum)`

The obvious disadvantage of this approach is a high amount of dummy counters emitted by the Mapper. The Mapper can decrease a number of counters via summing counters for each document:

 1 2 3 4 5 6 7 `class` `Mapper` `   ``method Map(docid id, doc d)` `      ``H = ``new` `AssociativeArray` `      ``for` `all term t in doc d ``do` `          ``H{t} = H{t} + 1` `      ``for` `all term t in H ``do` `         ``Emit(term t, count H{t})`

In order to accumulate counters not only for one document, but for all documents processed by one Mapper node, it is possible to leverage Combiners:

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 `class` `Mapper` `   ``method Map(docid id, doc d)` `      ``for` `all term t in doc d ``do` `         ``Emit(term t, count 1)`   `class` `Combiner` `   ``method Combine(term t, [c1, c2,...])` `      ``sum = 0` `      ``for` `all count c in [c1, c2,...] ``do` `          ``sum = sum + c` `      ``Emit(term t, count sum)`   `class` `Reducer` `   ``method Reduce(term t, counts [c1, c2,...])` `      ``sum = 0` `      ``for` `all count c in [c1, c2,...] ``do` `          ``sum = sum + c` `      ``Emit(term t, count sum)`

### Applications:

Log Analysis, Data Querying

## Collating

Problem Statement: There is a set of items and some function of one item. It is required to save all items that have the same value of function into one file or perform some other computation that requires all such items to be processed as a group. The most typical example is building of inverted indexes.

Solution:

The solution is straightforward. Mapper computes a given function for each item and emits value of the function as a key and item itself as a value. Reducer obtains all items grouped by function value and process or save them. In case of inverted indexes, items are terms (words) and function is a document ID where the term was found.

### Applications:

Inverted Indexes, ETL

## Filtering (“Grepping”), Parsing, and Validation

Problem Statement: There is a set of records and it is required to collect all records that meet some condition or transform each record (independently from other records) into another representation. The later case includes such tasks as text parsing and value extraction, conversion from one format to another.

Solution:  Solution is absolutely straightforward – Mapper takes records one by one and emits accepted items or their transformed versions.

### Applications:

Log Analysis, Data Querying, ETL, Data Validation

Problem Statement: There is a large computational problem that can be divided into multiple parts and results from all parts can be combined together to obtain a final result.

Solution:  Problem description is split in a set of specifications and specifications are stored as input data for Mappers. Each Mapper takes a specification, performs corresponding computations and emits results. Reducer combines all emitted parts into the final result.

### Case Study: Simulation of a Digital Communication System

There is a software simulator of a digital communication system like WiMAX that passes some volume of random data through the system model and computes error probability of throughput. Each Mapper runs simulation for specified amount of data which is 1/Nth of the required sampling and emit error rate. Reducer computes average error rate.

### Applications:

Physical and Engineering Simulations, Numerical Analysis, Performance Testing

## Sorting

Problem Statement: There is a set of records and it is required to sort these records by some rule or process these records in a certain order.

Solution: Simple sorting is absolutely straightforward – Mappers just emit all items as values associated with the sorting keys that are assembled as function of items. Nevertheless, in practice sorting is often used in a quite tricky way, that’s why it is said to be a heart of MapReduce (and Hadoop). In particular, it is very common to use composite keys to achieve secondary sorting and grouping.

Sorting in MapReduce is originally intended for sorting of the emitted key-value pairs by key, but there exist techniques that leverage Hadoop implementation specifics to achieve sorting by values. See this blog for more details.

It is worth noting that if MapReduce is used for sorting of the original (not intermediate) data, it is often a good idea to continuously maintain data in sorted state using BigTable concepts. In other words, it can be more efficient to sort data once during insertion than sort them for each MapReduce query.

### Applications:

ETL, Data Analysis

# Not-So-Basic MapReduce Patterns

## Iterative Message Passing (Graph Processing)

Problem Statement: There is a network of entities and relationships between them. It is required to calculate a state of each entity on the basis of properties of the other entities in its neighborhood. This state can represent a distance to other nodes,  indication that there is a neighbor with the certain properties, characteristic of neighborhood density and so on.

Solution: A network is stored as a set of nodes and each node contains a list of adjacent node IDs. Conceptually, MapReduce jobs are performed in iterative way and at each iteration each node sends messages to its neighbors. Each neighbor updates its state on the basis of the received messages. Iterations are terminated by some condition like fixed maximal number of iterations (say, network diameter) or negligible changes in states between two consecutive iterations. From the technical point of view, Mapper emits messages for each node using ID of the adjacent node as a key. As result, all messages are grouped by the incoming node and reducer is able to recompute state and rewrite node with the new state. This algorithm is shown in the figure below:

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 `class` `Mapper` `   ``method Map(id n, object N)` `      ``Emit(id n, object N)` `      ``for` `all id m in N.OutgoingRelations ``do` `         ``Emit(id m, message getMessage(N))`   `class` `Reducer` `   ``method Reduce(id m, [s1, s2,...])` `      ``M = null` `      ``messages = []` `      ``for` `all s in [s1, s2,...] ``do` `          ``if` `IsObject(s) then` `             ``M = s` `          ``else`               `// s is a message` `             ``messages.add(s)` `      ``M.State = calculateState(messages)` `      ``Emit(id m, item M)`

It should be emphasized that state of one node rapidly propagates across all the network of network is not too sparse because all nodes that were “infected” by this state start to “infect” all their neighbors. This process is illustrated in the figure below: ### Case Study: Availability Propagation Through The Tree of Categories

Problem Statement: This problem is inspired by real life eCommerce task. There is a tree of categories that branches out from large categories (like Men, Women, Kids) to smaller ones (like Men Jeans or Women Dresses), and eventually to small end-of-line categories (like Men Blue Jeans). End-of-line category is either available (contains products) or not. Some high level category is available if there is at least one available end-of-line category in its subtree. The goal is to calculate availabilities for all categories if availabilities of end-of-line categories are know.

Solution: This problem can be solved using the framework that was described in the previous section. We define getMessage and calculateState methods as follows:

 1 2 3 4 5 6 7 8 `class` `N` `   ``State in {True = 2, False = 1, null = 0}, initialized 1 or 2 ``for` `end-of-line categories, 0 otherwise`   `method getMessage(object N)` `   ``return` `N.State`   `method calculateState(state s, data [d1, d2,...])` `   ``return` `max( [d1, d2,...] )`

Problem Statement: There is a graph and it is required to calculate distance (a number of hops) from one source node to all other nodes in the graph.

Solution: Source node emits 0 to all its neighbors and these neighbors propagate this counter incrementing it by 1 during each hope:

 1 2 3 4 5 6 7 8 `class` `N` `   ``State is distance, initialized 0 ``for` `source node, INFINITY ``for` `all other nodes`   `method getMessage(N)` `   ``return` `N.State + 1`   `method calculateState(state s, data [d1, d2,...])` `   ``min( [d1, d2,...] )`

### Case Study: PageRank and Mapper-Side Data Aggregation

This algorithm was suggested by Google to calculate relevance of a web page as a function of authoritativeness (PageRank) of pages that have links to this page. The real algorithm is quite complex, but in its core it is just a propagation of weights between nodes where each node calculates its weight as a mean of the incoming weights:

 1 2 3 4 5 6 7 8 `class` `N` `    ``State is PageRank`   `method getMessage(object N)` `    ``return` `N.State / N.OutgoingRelations.size()`   `method calculateState(state s, data [d1, d2,...])` `    ``return` `( sum([d1, d2,...]) )`

It is worth mentioning that the schema we use is too generic and doesn’t take advantage of the fact that state is a numerical value. In most of practical cases, we can perform aggregation of values on the Mapper side due to virtue of this fact. This optimization  is illustrated in the code snippet below (for the PageRank algorithm):

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 `class` `Mapper` `   ``method Initialize` `      ``H = ``new` `AssociativeArray` `   ``method Map(id n, object N)` `      ``p = N.PageRank  / N.OutgoingRelations.size()` `      ``Emit(id n, object N)` `      ``for` `all id m in N.OutgoingRelations ``do` `         ``H{m} = H{m} + p` `   ``method Close` `      ``for` `all id n in H ``do` `         ``Emit(id n, value H{n})`   `class` `Reducer` `   ``method Reduce(id m, [s1, s2,...])` `      ``M = null` `      ``p = 0` `      ``for` `all s in [s1, s2,...] ``do` `          ``if` `IsObject(s) then` `             ``M = s` `          ``else` `             ``p = p + s` `      ``M.PageRank = p` `      ``Emit(id m, item M)`

### Applications:

Graph Analysis, Web Indexing

## Distinct Values (Unique Items Counting)

Problem Statement: There is a set of records that contain fields F and G. Count the total number of unique values of filed F for each subset of records that have the same G (grouped by G).

The problem can be a little bit generalized and formulated in terms of faceted search:

Problem Statement: There is a set of records. Each record has field F and arbitrary number of category labels G = {G1, G2, …} . Count the total number of unique values of filed F for each subset of records for each value of any label. Example:

 1 2 3 4 5 6 7 8 9 10 `Record 1: F=1, G={a, b}` `Record 2: F=2, G={a, d, e}` `Record 3: F=1, G={b}` `Record 4: F=3, G={a, b}`   `Result:` `a -> 3   ``// F=1, F=2, F=3` `b -> 2   ``// F=1, F=3` `d -> 1   ``// F=2` `e -> 1   ``// F=2`

Solution I:

The first approach is to solve the problem in two stages. At the first stage Mapper emits dummy counters for each pair of F and G; Reducer calculates a total number of occurrences for each such pair. The main goal of this phase is to guarantee uniqueness of F values. At the second phase pairs are grouped by G and the total number of items in each group is calculated.

Phase I:

 1 2 3 4 5 6 7 8 `class` `Mapper` `   ``method Map(null, record [value f, categories [g1, g2,...]])` `      ``for` `all category g in [g1, g2,...]` `         ``Emit(record [g, f], count 1)`   `class` `Reducer` `   ``method Reduce(record [g, f], counts [n1, n2, ...])` `      ``Emit(record [g, f], null )`

Phase II:

 1 2 3 4 5 6 7 `class` `Mapper` `   ``method Map(record [f, g], null)` `      ``Emit(value g, count 1)`   `class` `Reducer` `   ``method Reduce(value g, counts [n1, n2,...])` `      ``Emit(value g, sum( [n1, n2,...] ) )`

Solution II:

The second solution requires only one MapReduce job, but it is not really scalable and its applicability is limited. The algorithm is simple – Mapper emits values and categories, Reducer excludes duplicates from the list of categories for each value and increment counters for each category. The final step is to sum all counter emitted by Reducer. This approach is applicable if th number of record with the same f value is not very high and total number of categories is also limited. For instance, this approach is applicable for processing of web logs and classification of users – total number of users is high, but number of events for one user is limited, as well as a number of categories to classify by. It worth noting that Combiners can be used in this schema to exclude duplicates from category lists before data will be transmitted to Reducer.

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 `class` `Mapper` `   ``method Map(null, record [value f, categories [g1, g2,...] )` `      ``for` `all category g in [g1, g2,...]` `          ``Emit(value f, category g)`   `class` `Reducer` `   ``method Initialize` `      ``H = ``new` `AssociativeArray : category -> count` `   ``method Reduce(value f, categories [g1, g2,...])` `      ``[g1``', g2'``,..] = ExcludeDuplicates( [g1, g2,..] )` `      ``for` `all category g in [g1``', g2'``,...]` `         ``H{g} = H{g} + 1` `   ``method Close` `      ``for` `all category g in H ``do` `         ``Emit(category g, count H{g})`

### Applications:

Log Analysis, Unique Users Counting

## Cross-Correlation

Problem Statement: There is a set of tuples of items. For each possible pair of items calculate a number of tuples where these items co-occur. If the total number of items is N then N*N values should be reported.

This problem appears in text analysis (say, items are words and tuples are sentences), market analysis (customers who buy this tend to also buy that). If N*N is quite small and such a matrix can fit in the memory of a single machine, then implementation is straightforward.

Pairs Approach

The first approach is to emit all pairs and dummy counters from Mappers and sum these counters on Reducer. The shortcomings are:

• The benefit from combiners is limited, as it is likely that all pair are distinct
• There is no in-memory accumulations
 1 2 3 4 5 6 7 8 9 10 `class` `Mapper` `   ``method Map(null, items [i1, i2,...] )` `      ``for` `all item i in [i1, i2,...]` `         ``for` `all item j in [i1, i2,...]` `            ``Emit(pair [i j], count 1)`   `class` `Reducer` `   ``method Reduce(pair [i j], counts [c1, c2,...])` `      ``s = sum([c1, c2,...])` `      ``Emit(pair[i j], count s)`

Stripes Approach

The second approach is to group data by the first item in pair and maintain an associative array (“stripe”) where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.

• Generates fewer intermediate keys. Hence the framework has less sorting to do.
• Greately benefits from combiners.
• Performs in-memory accumulation. This can lead to problems, if not properly implemented.
• More complex implementation.
• In general, “stripes” is faster than “pairs”
 1 2 3 4 5 6 7 8 9 10 11 12 13 14 `class` `Mapper` `   ``method Map(null, items [i1, i2,...] )` `      ``for` `all item i in [i1, i2,...]` `         ``H = ``new` `AssociativeArray : item -> counter` `         ``for` `all item j in [i1, i2,...]` `            ``H{j} = H{j} + 1` `         ``Emit(item i, stripe H)`   `class` `Reducer` `   ``method Reduce(item i, stripes [H1, H2,...])` `      ``H = ``new` `AssociativeArray : item -> counter` `      ``H = merge-sum( [H1, H2,...] )` `      ``for` `all item j in H.keys()` `         ``Emit(pair [i j], H{j})`

### Applications:

Text Analysis, Market Analysis

### References:

1. Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce

# Relational MapReduce Patterns

In this section we go though the main relational operators and discuss how these operators can implemented in MapReduce terms.

## Selection

 1 2 3 4 `class` `Mapper` `   ``method Map(rowkey key, tuple t)` `      ``if` `t satisfies the predicate` `         ``Emit(tuple t, null)`

## Projection

Projection is just a little bit more complex than selection, but we should use a Reducer in this case to eliminate possible duplicates.

 1 2 3 4 5 6 7 8 `class` `Mapper` `   ``method Map(rowkey key, tuple t)` `      ``tuple g = project(t)  ``// extract required fields to tuple g` `      ``Emit(tuple g, null)`   `class` `Reducer` `   ``method Reduce(tuple t, array n)   ``// n is an array of nulls` `      ``Emit(tuple t, null)`

## Union

Mappers are fed by all records of two sets to be united. Reducer is used to eliminate duplicates.

 1 2 3 4 5 6 7 `class` `Mapper` `   ``method Map(rowkey key, tuple t)` `      ``Emit(tuple t, null)`   `class` `Reducer` `   ``method Reduce(tuple t, array n)   ``// n is an array of one or two nulls` `      ``Emit(tuple t, null)`

## Intersection

Mappers are fed by all records of two sets to be intersected. Reducer emits only records that occurred twice. It is possible only if both sets contain this record because record includes primary key and can occur in one set only once.

 1 2 3 4 5 6 7 8 `class` `Mapper` `   ``method Map(rowkey key, tuple t)` `      ``Emit(tuple t, null)`   `class` `Reducer` `   ``method Reduce(tuple t, array n)   ``// n is an array of one or two nulls` `      ``if` `n.size() = 2` `          ``Emit(tuple t, null)`

## Difference

Let’s we have two sets of records – R and S. We want to compute difference R – S. Mapper emits all tuples and tag which is a name of the set this record came from. Reducer emits only records that came from R but not from S.

 1 2 3 4 5 6 7 8 `class` `Mapper` `   ``method Map(rowkey key, tuple t)` `      ``Emit(tuple t, string t.SetName)    ``// t.SetName is either 'R' or 'S'`   `class` `Reducer` `   ``method Reduce(tuple t, array n) ``// array n can be ['R'], ['S'], ['R' 'S'], or ['S', 'R']` `      ``if` `n.size() = 1 and n = ``'R'` `          ``Emit(tuple t, null)`

## GroupBy and Aggregation

Grouping and aggregation can be performed in one MapReduce job as follows. Mapper extract from each tuple values to group by and aggregate and emits them. Reducer receives values to be aggregated already grouped and calculates an aggregation function. Typical aggregation functions like sum or max can be calculated in a streaming fashion, hence don’t require to handle all values simultaneously. Nevertheless, in some cases two phase MapReduce job may be required – see pattern Distinct Values as an example.

 1 2 3 4 5 6 `class` `Mapper` `   ``method Map(null, tuple [value GroupBy, value AggregateBy, value ...])` `      ``Emit(value GroupBy, value AggregateBy)` `class` `Reducer` `   ``method Reduce(value GroupBy, [v1, v2,...])` `      ``Emit(value GroupBy, aggregate( [v1, v2,...] ) )  ``// aggregate() : sum(), max(),...`

## Joining

Joins are perfectly possible in MapReduce framework, but there exist a number of techniques that differ in efficiency and data volumes they are oriented for. In this section we study some basic approaches. The references section contains links to detailed studies of join techniques.

### Repartition Join (Reduce Join, Sort-Merge Join)

This algorithm joins of two sets R and L on some key k. Mapper goes through all tuples from R and L, extracts key k from the tuples, marks tuple with a tag that indicates a set this tuple came from (‘R’ or ‘L’), and emits tagged tuple using k as a key. Reducer receives all tuples for a particular key k and put them into two buckets – for R and for L. When two buckets are filled, Reducer runs nested loop over them and emits a cross join of the buckets. Each emitted tuple is a concatenation R-tuple, L-tuple, and key k. This approach has the following disadvantages:

• Mapper emits absolutely all data, even for keys that occur only in one set and have no pair in the other.
• Reducer should hold all data for one key in the memory. If data doesn’t fit the memory, its Reducer’s responsibility to handle this by some kind of swap.
Nevertheless, Repartition Join is a most generic technique that can be successfully used when other optimized techniques are not applicable.
 1 2 3 4 5 6 7 8 9 10 11 12 `class` `Mapper` `   ``method Map(null, tuple [join_key k, value v1, value v2,...])` `      ``Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, ...] ] )`   `class` `Reducer` `   ``method Reduce(join_key k, tagged_tuples [t1, t2,...])` `      ``H = ``new` `AssociativeArray : set_name -> values` `      ``for` `all tagged_tuple t in [t1, t2,...]     ``// separate values into 2 arrays` `         ``H{t.tag}.add(t.values)` `      ``for` `all values r in H{``'R'``}                 ``// produce a cross-join of the two arrays` `         ``for` `all values l in H{``'L'``}` `            ``Emit(null, [k r l] )`

### Replicated Join (Map Join, Hash Join)

In practice, it is typical to join a small set with a large one (say, a list of users with a list of log records). Let’s assume that we join two sets – R and L, R is relative small. If so, R can be distributed to all Mappers and each Mapper can load it and index by the join key. The most common and efficient indexing technique here is a hash table. After this, Mapper goes through tuples of the set L and joins them with the corresponding tuples from R that are stored in the hash table. This approach is very effective because there is no need in sorting or transmission of the set L over the network, but set R should be quite small to be distributed to the all Mappers.

 1 2 3 4 5 6 7 8 9 10 `class` `Mapper` `   ``method Initialize` `      ``H = ``new` `AssociativeArray : join_key -> tuple from R` `      ``R = loadR()` `      ``for` `all [ join_key k, tuple [r1, r2,...] ] in R` `         ``H{k} = H{k}.append( [r1, r2,...] )`   `   ``method Map(join_key k, tuple l)` `      ``for` `all tuple r in H{k}` `         ``Emit(null, tuple [k r l] )`