Good articles to learn map reduce
Basics of Map Reduce Algorithm Explained with a Simple Example
From: http://www.thegeekstuff.com/2014/05/mapreducealgorithm/
While processing large set of data, we should definitely address scalability and efficiency in the application code that is processing the large amount of data.
Map reduce algorithm (or flow) is highly effective in handling big data.
Let us take a simple example and use map reduce to solve a problem.
Say you are processing a large amount of data and trying to find out what percentage of your user base where talking about games.
First, we will identify the keywords which we are going to map from the data to conclude that its something related to games.
Next, we will write a mapping function to identify such patterns in our data. For example, the keywords can be Gold medals, Bronze medals, Silver medals, Olympic football, basketball, cricket, etc.
Let us take the following chunks in a big data set and see how to process it.
“Hi, how are you”
“We love football”
“He is an awesome football player”
“Merry Christmas”
“Olympics will be held in China”
“Records broken today in Olympics”
“Yes, we won 2 Gold medals”
“He qualified for Olympics”
Mapping Phase
So our map phase of our algorithm will be as follows:
1. Declare a function “Map”
2. Loop: For each words equal to “football”
3. Increment counter
4. Return key value “football”=>counter
In the same way, we can define n number of mapping functions for mapping various words words: “Olympics”, “Gold Medals”, “cricket”, etc.
Reducing Phase
The reducing function will accept the input from all these mappers in form of key value pair and then processing it. So, input to the reduce function will look like the following:
 reduce(“football”=>2)
 reduce(“Olympics”=>3)
Our algorithm will continue with the following steps:
5. Declare a function reduce to accept the values from map function.
6. Where for each keyvalue pair, add value to counter.
7. Return “games”=> counter.
At the end, we will get the output like “games”=>5.
Now, getting into a big picture we can write n number of mapper functions here. Let us say that you want to know who all where wishing each other. In this case you will write a mapping function to map the words like “Wishing”, “Wish”, “Happy”, “Merry” and then will write a corresponding reducer function.
Here you will need one function for shuffling which will distinguish between the “games” and “wishing” keys returned by mappers and will send it to the respective reducer function.
Similarly you may need a function for splitting initially to give inputs to the mapper functions in form of chunks.
Flow of Map Reduce Algorithm
To following diagram summarizes the the flow of Map reduce algorithm:
In the above map reduce flow:
 The input data can be divided into n number of chunks depending upon the amount of data and processing capacity of individual unit.
 Next, it is passed to the mapper functions. Please note that all the chunks are processed simultaneously at the same time, which embraces the parallel processing of data.
 After that, shuffling happens which leads to aggregation of similar patterns.
 Finally, reducers combine them all to get a consolidated output as per the logic.
 This algorithm embraces scalability as depending on the size of the input data, we can keep increasing the number of the parallel processing units.
MapReduce is a framework originally developed at Google that allows for easy large scale distributed computing across a number of domains. Apache Hadoop is an open source implementation.
I’ll gloss over the details, but it comes down to defining two functions: a map function and a reduce function. The map function takes a value and outputs key:value pairs. For instance, if we define a map function that takes a string and outputs the length of the word as the key and the word itself as the value then map(steve) would return 5:steve and map(savannah) would return 8:savannah. You may have noticed that the map function is stateless and only requires the input value to compute it’s output value. This allows us to run the map function against values in parallel and provides a huge advantage. Before we get to the reduce function, the mapreduce framework groups all of the values together by key, so if the map functions output the following key:value pairs:
3 : the3 : and3 : you4 : then4 : what4 : when5 : steve5 : where8 : savannah8 : research
They get grouped as:
3 : [the, and, you]4 : [then, what, when]5 : [steve, where]8 : [savannah, research]
Each of these lines would then be passed as an argument to the reduce function, which accepts a key and a list of values. In this instance, we might be trying to figure out how many words of certain lengths exist, so our reduce function will just count the number of items in the list and output the key with the size of the list, like:
3 : 34 : 35 : 28 : 2
The reductions can also be done in parallel, again providing a huge advantage. We can then look at these final results and see that there were only two words of length 5 in our corpus, etc…
The most common example of mapreduce is for counting the number of times words occur in a corpus. Suppose you had a copy of the internet (I’ve been fortunate enough to have worked in such a situation), and you wanted a list of every word on the internet as well as how many times it occurred.
The way you would approach this would be to tokenize the documents you have (break it into words), and pass each word to a mapper. The mapper would then spit the word back out along with a value of 1. The grouping phase will take all the keys (in this case words), and make a list of 1’s. The reduce phase then takes a key (the word) and a list (a list of 1’s for every time the key appeared on the internet), and sums the list. The reducer then outputs the word, along with it’s count. When all is said and done you’ll have a list of every word on the internet, along with how many times it appeared.
Easy, right? If you’ve ever read about mapreduce, the above scenario isn’t anything new… it’s the “Hello, World” of mapreduce. So here is a real world use case (Facebook may or may not actually do the following, it’s just an example):
Facebook has a list of friends (note that friends are a bidirectional thing on Facebook. If I’m your friend, you’re mine). They also have lots of disk space and they serve hundreds of millions of requests everyday. They’ve decided to precompute calculations when they can to reduce the processing time of requests. One common processing request is the “You and Joe have 230 friends in common” feature. When you visit someone’s profile, you see a list of friends that you have in common. This list doesn’t change frequently so it’d be wasteful to recalculate it every time you visited the profile (sure you could use a decent caching strategy, but then I wouldn’t be able to continue writing about mapreduce for this problem). We’re going to use mapreduce so that we can calculate everyone’s common friends once a day and store those results. Later on it’s just a quick lookup. We’ve got lots of disk, it’s cheap.
Assume the friends are stored as Person>[List of Friends], our friends list is then:
A > B C DB > A C D EC > A B D ED > A B C EE > B C D
Each line will be an argument to a mapper. For every friend in the list of friends, the mapper will output a keyvalue pair. The key will be a friend along with the person. The value will be the list of friends. The key will be sorted so that the friends are in order, causing all pairs of friends to go to the same reducer. This is hard to explain with text, so let’s just do it and see if you can see the pattern. After all the mappers are done running, you’ll have a list like this:
For map(A > B C D) :
(A B) > B C D(A C) > B C D(A D) > B C D
For map(B > A C D E) : (Note that A comes before B in the key)
(A B) > A C D E(B C) > A C D E(B D) > A C D E(B E) > A C D E
For map(C > A B D E) :
(A C) > A B D E(B C) > A B D E(C D) > A B D E(C E) > A B D E
For map(D > A B C E) :
(A D) > A B C E(B D) > A B C E(C D) > A B C E(D E) > A B C E
And finally for map(E > B C D):
(B E) > B C D(C E) > B C D(D E) > B C D
Before we send these keyvalue pairs to the reducers, we group them by their keys and get:
(A B) > (A C D E) (B C D)(A C) > (A B D E) (B C D)(A D) > (A B C E) (B C D)(B C) > (A B D E) (A C D E)(B D) > (A B C E) (A C D E)(B E) > (A C D E) (B C D)(C D) > (A B C E) (A B D E)(C E) > (A B D E) (B C D)(D E) > (A B C E) (B C D)
Each line will be passed as an argument to a reducer. The reduce function will simply intersect the lists of values and output the same key with the result of the intersection. For example, reduce((A B) > (A C D E) (B C D)) will output (A B) : (C D) and means that friends A and B have C and D as common friends.
The result after reduction is:
(A B) > (C D)(A C) > (B D)(A D) > (B C)(B C) > (A D E)(B D) > (A C E)(B E) > (C D)(C D) > (A B E)(C E) > (B D)(D E) > (B C)
Now when D visits B’s profile, we can quickly look up (B D) and see that they have three friends in common, (A C E).
MAPREDUCE PATTERNS, ALGORITHMS, AND USE CASES
From: https://highlyscalable.wordpress.com/2012/02/01/mapreducepatterns/
In this article I digested a number of MapReduce patterns and algorithms to give a systematic view of the different techniques that can be found on the web or scientific articles. Several practical case studies are also provided. All descriptions and code snippets use the standard Hadoop’s MapReduce model with Mappers, Reduces, Combiners, Partitioners, and sorting. This framework is depicted in the figure below.
Basic MapReduce Patterns
Counting and Summing
Problem Statement: There is a number of documents where each document is a set of terms. It is required to calculate a total number of occurrences of each term in all documents. Alternatively, it can be an arbitrary function of the terms. For instance, there is a log file where each record contains a response time and it is required to calculate an average response time.
Solution:
Let start with something really simple. The code snippet below shows Mapper that simply emit “1” for each term it processes and Reducer that goes through the lists of ones and sum them up:
1
2
3
4
5
6
7
8
9
10
11

class Mapper method Map(docid id, doc d) for all term t in doc d do Emit(term t, count 1) class Reducer method Reduce(term t, counts [c1, c2,...]) sum = 0 for all count c in [c1, c2,...] do sum = sum + c Emit(term t, count sum) 
The obvious disadvantage of this approach is a high amount of dummy counters emitted by the Mapper. The Mapper can decrease a number of counters via summing counters for each document:
1
2
3
4
5
6
7

class Mapper method Map(docid id, doc d) H = new AssociativeArray for all term t in doc d do H{t} = H{t} + 1 for all term t in H do Emit(term t, count H{t}) 
In order to accumulate counters not only for one document, but for all documents processed by one Mapper node, it is possible to leverage Combiners:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

class Mapper method Map(docid id, doc d) for all term t in doc d do Emit(term t, count 1) class Combiner method Combine(term t, [c1, c2,...]) sum = 0 for all count c in [c1, c2,...] do sum = sum + c Emit(term t, count sum) class Reducer method Reduce(term t, counts [c1, c2,...]) sum = 0 for all count c in [c1, c2,...] do sum = sum + c Emit(term t, count sum) 
Applications:
Log Analysis, Data Querying
Collating
Problem Statement: There is a set of items and some function of one item. It is required to save all items that have the same value of function into one file or perform some other computation that requires all such items to be processed as a group. The most typical example is building of inverted indexes.
Solution:
The solution is straightforward. Mapper computes a given function for each item and emits value of the function as a key and item itself as a value. Reducer obtains all items grouped by function value and process or save them. In case of inverted indexes, items are terms (words) and function is a document ID where the term was found.
Applications:
Inverted Indexes, ETL
Filtering (“Grepping”), Parsing, and Validation
Problem Statement: There is a set of records and it is required to collect all records that meet some condition or transform each record (independently from other records) into another representation. The later case includes such tasks as text parsing and value extraction, conversion from one format to another.
Solution: Solution is absolutely straightforward – Mapper takes records one by one and emits accepted items or their transformed versions.
Applications:
Log Analysis, Data Querying, ETL, Data Validation
Distributed Task Execution
Problem Statement: There is a large computational problem that can be divided into multiple parts and results from all parts can be combined together to obtain a final result.
Solution: Problem description is split in a set of specifications and specifications are stored as input data for Mappers. Each Mapper takes a specification, performs corresponding computations and emits results. Reducer combines all emitted parts into the final result.
Case Study: Simulation of a Digital Communication System
There is a software simulator of a digital communication system like WiMAX that passes some volume of random data through the system model and computes error probability of throughput. Each Mapper runs simulation for specified amount of data which is 1/Nth of the required sampling and emit error rate. Reducer computes average error rate.
Applications:
Physical and Engineering Simulations, Numerical Analysis, Performance Testing
Sorting
Problem Statement: There is a set of records and it is required to sort these records by some rule or process these records in a certain order.
Solution: Simple sorting is absolutely straightforward – Mappers just emit all items as values associated with the sorting keys that are assembled as function of items. Nevertheless, in practice sorting is often used in a quite tricky way, that’s why it is said to be a heart of MapReduce (and Hadoop). In particular, it is very common to use composite keys to achieve secondary sorting and grouping.
Sorting in MapReduce is originally intended for sorting of the emitted keyvalue pairs by key, but there exist techniques that leverage Hadoop implementation specifics to achieve sorting by values. See this blog for more details.
It is worth noting that if MapReduce is used for sorting of the original (not intermediate) data, it is often a good idea to continuously maintain data in sorted state using BigTable concepts. In other words, it can be more efficient to sort data once during insertion than sort them for each MapReduce query.
Applications:
ETL, Data Analysis
NotSoBasic MapReduce Patterns
Iterative Message Passing (Graph Processing)
Problem Statement: There is a network of entities and relationships between them. It is required to calculate a state of each entity on the basis of properties of the other entities in its neighborhood. This state can represent a distance to other nodes, indication that there is a neighbor with the certain properties, characteristic of neighborhood density and so on.
Solution: A network is stored as a set of nodes and each node contains a list of adjacent node IDs. Conceptually, MapReduce jobs are performed in iterative way and at each iteration each node sends messages to its neighbors. Each neighbor updates its state on the basis of the received messages. Iterations are terminated by some condition like fixed maximal number of iterations (say, network diameter) or negligible changes in states between two consecutive iterations. From the technical point of view, Mapper emits messages for each node using ID of the adjacent node as a key. As result, all messages are grouped by the incoming node and reducer is able to recompute state and rewrite node with the new state. This algorithm is shown in the figure below:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

class Mapper method Map(id n, object N) Emit(id n, object N) for all id m in N.OutgoingRelations do Emit(id m, message getMessage(N)) class Reducer method Reduce(id m, [s1, s2,...]) M = null messages = [] for all s in [s1, s2,...] do if IsObject(s) then M = s else // s is a message messages.add(s) M.State = calculateState(messages) Emit(id m, item M) 
It should be emphasized that state of one node rapidly propagates across all the network of network is not too sparse because all nodes that were “infected” by this state start to “infect” all their neighbors. This process is illustrated in the figure below:
Case Study: Availability Propagation Through The Tree of Categories
Problem Statement: This problem is inspired by real life eCommerce task. There is a tree of categories that branches out from large categories (like Men, Women, Kids) to smaller ones (like Men Jeans or Women Dresses), and eventually to small endofline categories (like Men Blue Jeans). Endofline category is either available (contains products) or not. Some high level category is available if there is at least one available endofline category in its subtree. The goal is to calculate availabilities for all categories if availabilities of endofline categories are know.
Solution: This problem can be solved using the framework that was described in the previous section. We define getMessage and calculateState methods as follows:
1
2
3
4
5
6
7
8

class N State in {True = 2, False = 1, null = 0}, initialized 1 or 2 for endofline categories, 0 otherwise method getMessage(object N) return N.State method calculateState(state s, data [d1, d2,...]) return max( [d1, d2,...] ) 
Case Study: BreadthFirst Search
Problem Statement: There is a graph and it is required to calculate distance (a number of hops) from one source node to all other nodes in the graph.
Solution: Source node emits 0 to all its neighbors and these neighbors propagate this counter incrementing it by 1 during each hope:
1
2
3
4
5
6
7
8

class N State is distance, initialized 0 for source node, INFINITY for all other nodes method getMessage(N) return N.State + 1 method calculateState(state s, data [d1, d2,...]) min( [d1, d2,...] ) 
Case Study: PageRank and MapperSide Data Aggregation
This algorithm was suggested by Google to calculate relevance of a web page as a function of authoritativeness (PageRank) of pages that have links to this page. The real algorithm is quite complex, but in its core it is just a propagation of weights between nodes where each node calculates its weight as a mean of the incoming weights:
1
2
3
4
5
6
7
8

class N State is PageRank method getMessage(object N) return N.State / N.OutgoingRelations.size() method calculateState(state s, data [d1, d2,...]) return ( sum([d1, d2,...]) ) 
It is worth mentioning that the schema we use is too generic and doesn’t take advantage of the fact that state is a numerical value. In most of practical cases, we can perform aggregation of values on the Mapper side due to virtue of this fact. This optimization is illustrated in the code snippet below (for the PageRank algorithm):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

class Mapper method Initialize H = new AssociativeArray method Map(id n, object N) p = N.PageRank / N.OutgoingRelations.size() Emit(id n, object N) for all id m in N.OutgoingRelations do H{m} = H{m} + p method Close for all id n in H do Emit(id n, value H{n}) class Reducer method Reduce(id m, [s1, s2,...]) M = null p = 0 for all s in [s1, s2,...] do if IsObject(s) then M = s else p = p + s M.PageRank = p Emit(id m, item M) 
Applications:
Graph Analysis, Web Indexing
Distinct Values (Unique Items Counting)
Problem Statement: There is a set of records that contain fields F and G. Count the total number of unique values of filed F for each subset of records that have the same G (grouped by G).
The problem can be a little bit generalized and formulated in terms of faceted search:
Problem Statement: There is a set of records. Each record has field F and arbitrary number of category labels G = {G1, G2, …} . Count the total number of unique values of filed F for each subset of records for each value of any label. Example:
1
2
3
4
5
6
7
8
9
10

Record 1: F=1, G={a, b} Record 2: F=2, G={a, d, e} Record 3: F=1, G={b} Record 4: F=3, G={a, b} Result: a > 3 // F=1, F=2, F=3 b > 2 // F=1, F=3 d > 1 // F=2 e > 1 // F=2 
Solution I:
The first approach is to solve the problem in two stages. At the first stage Mapper emits dummy counters for each pair of F and G; Reducer calculates a total number of occurrences for each such pair. The main goal of this phase is to guarantee uniqueness of F values. At the second phase pairs are grouped by G and the total number of items in each group is calculated.
Phase I:
1
2
3
4
5
6
7
8

class Mapper method Map(null, record [value f, categories [g1, g2,...]]) for all category g in [g1, g2,...] Emit(record [g, f], count 1) class Reducer method Reduce(record [g, f], counts [n1, n2, ...]) Emit(record [g, f], null ) 
Phase II:
1
2
3
4
5
6
7

class Mapper method Map(record [f, g], null) Emit(value g, count 1) class Reducer method Reduce(value g, counts [n1, n2,...]) Emit(value g, sum( [n1, n2,...] ) ) 
Solution II:
The second solution requires only one MapReduce job, but it is not really scalable and its applicability is limited. The algorithm is simple – Mapper emits values and categories, Reducer excludes duplicates from the list of categories for each value and increment counters for each category. The final step is to sum all counter emitted by Reducer. This approach is applicable if th number of record with the same f value is not very high and total number of categories is also limited. For instance, this approach is applicable for processing of web logs and classification of users – total number of users is high, but number of events for one user is limited, as well as a number of categories to classify by. It worth noting that Combiners can be used in this schema to exclude duplicates from category lists before data will be transmitted to Reducer.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

class Mapper method Map(null, record [value f, categories [g1, g2,...] ) for all category g in [g1, g2,...] Emit(value f, category g) class Reducer method Initialize H = new AssociativeArray : category > count method Reduce(value f, categories [g1, g2,...]) [g1 ', g2' ,..] = ExcludeDuplicates( [g1, g2,..] ) for all category g in [g1 ', g2' ,...] H{g} = H{g} + 1 method Close for all category g in H do Emit(category g, count H{g}) 
Applications:
Log Analysis, Unique Users Counting
CrossCorrelation
Problem Statement: There is a set of tuples of items. For each possible pair of items calculate a number of tuples where these items cooccur. If the total number of items is N then N*N values should be reported.
This problem appears in text analysis (say, items are words and tuples are sentences), market analysis (customers who buy this tend to also buy that). If N*N is quite small and such a matrix can fit in the memory of a single machine, then implementation is straightforward.
Pairs Approach
The first approach is to emit all pairs and dummy counters from Mappers and sum these counters on Reducer. The shortcomings are:
 The benefit from combiners is limited, as it is likely that all pair are distinct
 There is no inmemory accumulations
1
2
3
4
5
6
7
8
9
10

class Mapper method Map(null, items [i1, i2,...] ) for all item i in [i1, i2,...] for all item j in [i1, i2,...] Emit(pair [i j], count 1) class Reducer method Reduce(pair [i j], counts [c1, c2,...]) s = sum([c1, c2,...]) Emit(pair[i j], count s) 
Stripes Approach
The second approach is to group data by the first item in pair and maintain an associative array (“stripe”) where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.
 Generates fewer intermediate keys. Hence the framework has less sorting to do.
 Greately benefits from combiners.
 Performs inmemory accumulation. This can lead to problems, if not properly implemented.
 More complex implementation.
 In general, “stripes” is faster than “pairs”
1
2
3
4
5
6
7
8
9
10
11
12
13
14

class Mapper method Map(null, items [i1, i2,...] ) for all item i in [i1, i2,...] H = new AssociativeArray : item > counter for all item j in [i1, i2,...] H{j} = H{j} + 1 Emit(item i, stripe H) class Reducer method Reduce(item i, stripes [H1, H2,...]) H = new AssociativeArray : item > counter H = mergesum( [H1, H2,...] ) for all item j in H.keys() Emit(pair [i j], H{j}) 
Applications:
Text Analysis, Market Analysis
References:
 Lin J. Dyer C. Hirst G. Data Intensive Processing MapReduce
Relational MapReduce Patterns
In this section we go though the main relational operators and discuss how these operators can implemented in MapReduce terms.
Selection
1
2
3
4

class Mapper method Map(rowkey key, tuple t) if t satisfies the predicate Emit(tuple t, null) 
Projection
Projection is just a little bit more complex than selection, but we should use a Reducer in this case to eliminate possible duplicates.
1
2
3
4
5
6
7
8

class Mapper method Map(rowkey key, tuple t) tuple g = project(t) // extract required fields to tuple g Emit(tuple g, null) class Reducer method Reduce(tuple t, array n) // n is an array of nulls Emit(tuple t, null) 
Union
Mappers are fed by all records of two sets to be united. Reducer is used to eliminate duplicates.
1
2
3
4
5
6
7

class Mapper method Map(rowkey key, tuple t) Emit(tuple t, null) class Reducer method Reduce(tuple t, array n) // n is an array of one or two nulls Emit(tuple t, null) 
Intersection
Mappers are fed by all records of two sets to be intersected. Reducer emits only records that occurred twice. It is possible only if both sets contain this record because record includes primary key and can occur in one set only once.
1
2
3
4
5
6
7
8

class Mapper method Map(rowkey key, tuple t) Emit(tuple t, null) class Reducer method Reduce(tuple t, array n) // n is an array of one or two nulls if n.size() = 2 Emit(tuple t, null) 
Difference
Let’s we have two sets of records – R and S. We want to compute difference R – S. Mapper emits all tuples and tag which is a name of the set this record came from. Reducer emits only records that came from R but not from S.
1
2
3
4
5
6
7
8

class Mapper method Map(rowkey key, tuple t) Emit(tuple t, string t.SetName) // t.SetName is either 'R' or 'S' class Reducer method Reduce(tuple t, array n) // array n can be ['R'], ['S'], ['R' 'S'], or ['S', 'R'] if n.size() = 1 and n[1] = 'R' Emit(tuple t, null) 
GroupBy and Aggregation
Grouping and aggregation can be performed in one MapReduce job as follows. Mapper extract from each tuple values to group by and aggregate and emits them. Reducer receives values to be aggregated already grouped and calculates an aggregation function. Typical aggregation functions like sum or max can be calculated in a streaming fashion, hence don’t require to handle all values simultaneously. Nevertheless, in some cases two phase MapReduce job may be required – see pattern Distinct Values as an example.
1
2
3
4
5
6

class Mapper method Map(null, tuple [value GroupBy, value AggregateBy, value ...]) Emit(value GroupBy, value AggregateBy) class Reducer method Reduce(value GroupBy, [v1, v2,...]) Emit(value GroupBy, aggregate( [v1, v2,...] ) ) // aggregate() : sum(), max(),... 
Joining
Joins are perfectly possible in MapReduce framework, but there exist a number of techniques that differ in efficiency and data volumes they are oriented for. In this section we study some basic approaches. The references section contains links to detailed studies of join techniques.
Repartition Join (Reduce Join, SortMerge Join)
This algorithm joins of two sets R and L on some key k. Mapper goes through all tuples from R and L, extracts key k from the tuples, marks tuple with a tag that indicates a set this tuple came from (‘R’ or ‘L’), and emits tagged tuple using k as a key. Reducer receives all tuples for a particular key k and put them into two buckets – for R and for L. When two buckets are filled, Reducer runs nested loop over them and emits a cross join of the buckets. Each emitted tuple is a concatenation Rtuple, Ltuple, and key k. This approach has the following disadvantages:
 Mapper emits absolutely all data, even for keys that occur only in one set and have no pair in the other.
 Reducer should hold all data for one key in the memory. If data doesn’t fit the memory, its Reducer’s responsibility to handle this by some kind of swap.
1
2
3
4
5
6
7
8
9
10
11
12

class Mapper method Map(null, tuple [join_key k, value v1, value v2,...]) Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, ...] ] ) class Reducer method Reduce(join_key k, tagged_tuples [t1, t2,...]) H = new AssociativeArray : set_name > values for all tagged_tuple t in [t1, t2,...] // separate values into 2 arrays H{t.tag}.add(t.values) for all values r in H{ 'R' } // produce a crossjoin of the two arrays for all values l in H{ 'L' } Emit(null, [k r l] ) 
Replicated Join (Map Join, Hash Join)
In practice, it is typical to join a small set with a large one (say, a list of users with a list of log records). Let’s assume that we join two sets – R and L, R is relative small. If so, R can be distributed to all Mappers and each Mapper can load it and index by the join key. The most common and efficient indexing technique here is a hash table. After this, Mapper goes through tuples of the set L and joins them with the corresponding tuples from R that are stored in the hash table. This approach is very effective because there is no need in sorting or transmission of the set L over the network, but set R should be quite small to be distributed to the all Mappers.
1
2
3
4
5
6
7
8
9
10

class Mapper method Initialize H = new AssociativeArray : join_key > tuple from R R = loadR() for all [ join_key k, tuple [r1, r2,...] ] in R H{k} = H{k}.append( [r1, r2,...] ) method Map(join_key k, tuple l) for all tuple r in H{k} Emit(null, tuple [k r l] ) 
References:
Machine Learning and Math MapReduce Algorithms
 C. T. Chu et al provides an excellent description of machine learning algorithms for MapReduce in the article MapReduce for Machine Learning on Multicore.
 FFT using MapReduce: http://www.slideshare.net/hortonworks/largescalemathwithhadoopmapreduce
 MapReduce for integer factorization: http://www.javiertordable.com/files/MapreduceForIntegerFactorization.pdf
 Matrix multiplication with MapReduce: http://csl.skku.edu/papers/CSTR2010330.pdf and http://www.norstad.org/matrixmultiply/index.html