Tuesday, January 8, 2008

Google's MapReduce as Groovy closures

Google’s MapReduce programming model [http://labs.google.com/papers/mapreduce.html] serves for processing and generating large data sets. Users specify a mapper function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reducer function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model, as shown in the paper. Here we illustrate two simple examples: (a) for counting occurrences of words in documents; and (b) for creating an inverted index of the words in documents.

Both examples use the same mapReduce function, implemented as a Groovy closure. The function accepts two parameters representing, respectively, the mapper and reducer functions (also closures). The mapper, reducer and mapReduce functions exploit many of the functional programming techniques supported through Groovy closures. To aid the development we have prepared an abstract class ListFunctions which defines many of the standard list processing functions (closures) such as map, filter, composition, etc. The abstract class MapFunctions provides similar functions when applied to Maps.


Example 1: Word count

import com.adt.fp.ListFunctions as LF
import com.adt.fp.MapFunctions as MF

def mapReduce(mapper, reducer) {
def reducePerKey = LF.composition.curry(MF.mapWithKey.curry({ k, v -> v}),
LF.composition.curry(MF.filterWithKey.curry({ k, v -> (v != null)}),
MF.mapWithKey.curry(reducer)))

def groupByKey = { kvList ->
def insert = { mp, kv -> return MF.insertWith(LF.join, kv[0], [kv[1]], mp) }
return LF.foldL(insert, MF.empty(), kvList)
}

def mapPerKey = LF.composition.curry(LF.concat,
LF.composition.curry(LF.map.curry(LF.uncurry(mapper)), MF.toList))

return LF.composition.curry(reducePerKey, LF.composition.curry(groupByKey, mapPerKey))
}

def flip = { f, x, y -> return f(y, x) }
def mkPair = { x, y -> return [x, y] }

def tokenize = { str -> return str.tokenize() }
def sum = LF.foldL.curry({ x, y -> x + y }, 0)

def occurMapper = { key, words -> return LF.map(flip.curry(mkPair, 1), tokenize(words)) }
def occurReducer = { key, occurs -> sum(occurs) }

def wordOccurrenceCount = mapReduce(occurMapper, occurReducer)

def docs = ['doc1': 'fold the fold', 'doc2': 'appreciate the unfold']

assert wordOccurrenceCount(docs) == ["unfold":1, "the":2, "appreciate":1, "fold":2]

Example 2: Inverted index

import com.adt.fp.ListFunctions as LF
import com.adt.fp.MapFunctions as MF

def mapReduce(mapper, reducer) {
// …
}

def flip = { f, x, y -> return f(y, x) }
def mkPair = { x, y -> return [x, y] }

def tokenize = { str -> return str.tokenize() }

def indexMapper = { key, words -> return LF.map(flip.curry(mkPair, key), tokenize(words)) }
def indexReducer = { key, docs -> docs.unique().sort() }

def invertedIndex = mapReduce(indexMapper, indexReducer)

def docs = ['doc1': 'fold the fold', 'doc2': 'appreciate the unfold']

assert invertedIndex(docs) ==
["unfold":["doc2"], "the":["doc1", "doc2"], "appreciate":["doc2"], "fold":["doc1"]]


The mapReduce function returns the function (closure) that performs the required processing. It is defined in terms of its mapper function parameter and its reducer function parameter. The definition for mapReduce is in terms of its support functions mapPerKey, groupByKey and reducePerkey [http://www.cs.vu.nl/~ralf/MapReduce/]. The helper function mapPerKey exports the Map input data to a List then maps the mapper function over this List. The support function groupByKey groups intermediate values by intermediate keys. Finally, reducePerKey maps the reducer function over the groups of intermediate data.

In Example 1, the mapper function makes the pairs [word, 1] for every word in the source input. It is implemented by mapping the function flip.curry(mkPair, 1) over each word. This clever function uses mkPair, to produce the pair and flip ensures they are in the correct order. The reducer function for this example simply sums each of the 1s in the intermediate values.

In Example 2, the mapper function delivers the pairs [documentedID, word] for every word in the source input. The reducer function removes any duplicates and sorts the List of documentIDs from the intermediate values.

Sample code and a more expansive decription can be found at [Groovy MapReduce].