Both examples use the same mapReduce function, implemented as a Groovy closure. The function accepts two parameters representing, respectively, the mapper and reducer functions (also closures). The mapper, reducer and mapReduce functions exploit many of the functional programming techniques supported through Groovy closures. To aid the development we have prepared an abstract class ListFunctions which defines many of the standard list processing functions (closures) such as map, filter, composition, etc. The abstract class MapFunctions provides similar functions when applied to Maps.
Example 1: Word count
import com.adt.fp.ListFunctions as LF
import com.adt.fp.MapFunctions as MF
def mapReduce(mapper, reducer) {
def reducePerKey = LF.composition.curry(MF.mapWithKey.curry({ k, v -> v}),
LF.composition.curry(MF.filterWithKey.curry({ k, v -> (v != null)}),
MF.mapWithKey.curry(reducer)))
def groupByKey = { kvList ->
def insert = { mp, kv -> return MF.insertWith(LF.join, kv[0], [kv[1]], mp) }
return LF.foldL(insert, MF.empty(), kvList)
}
def mapPerKey = LF.composition.curry(LF.concat,
LF.composition.curry(LF.map.curry(LF.uncurry(mapper)), MF.toList))
return LF.composition.curry(reducePerKey, LF.composition.curry(groupByKey, mapPerKey))
}
def flip = { f, x, y -> return f(y, x) }
def mkPair = { x, y -> return [x, y] }
def tokenize = { str -> return str.tokenize() }
def sum = LF.foldL.curry({ x, y -> x + y }, 0)
def occurMapper = { key, words -> return LF.map(flip.curry(mkPair, 1), tokenize(words)) }
def occurReducer = { key, occurs -> sum(occurs) }
def wordOccurrenceCount = mapReduce(occurMapper, occurReducer)
def docs = ['doc1': 'fold the fold', 'doc2': 'appreciate the unfold']
assert wordOccurrenceCount(docs) == ["unfold":1, "the":2, "appreciate":1, "fold":2]
Example 2: Inverted index
import com.adt.fp.ListFunctions as LF
import com.adt.fp.MapFunctions as MF
def mapReduce(mapper, reducer) {
// …
}
def flip = { f, x, y -> return f(y, x) }
def mkPair = { x, y -> return [x, y] }
def tokenize = { str -> return str.tokenize() }
def indexMapper = { key, words -> return LF.map(flip.curry(mkPair, key), tokenize(words)) }
def indexReducer = { key, docs -> docs.unique().sort() }
def invertedIndex = mapReduce(indexMapper, indexReducer)
def docs = ['doc1': 'fold the fold', 'doc2': 'appreciate the unfold']
assert invertedIndex(docs) ==
["unfold":["doc2"], "the":["doc1", "doc2"], "appreciate":["doc2"], "fold":["doc1"]]
The mapReduce function returns the function (closure) that performs the required processing. It is defined in terms of its mapper function parameter and its reducer function parameter. The definition for mapReduce is in terms of its support functions mapPerKey, groupByKey and reducePerkey [http://www.cs.vu.nl/~ralf/MapReduce/]. The helper function mapPerKey exports the Map input data to a List then maps the mapper function over this List. The support function groupByKey groups intermediate values by intermediate keys. Finally, reducePerKey maps the reducer function over the groups of intermediate data.
In Example 1, the mapper function makes the pairs [word, 1] for every word in the source input. It is implemented by mapping the function flip.curry(mkPair, 1) over each word. This clever function uses mkPair, to produce the pair and flip ensures they are in the correct order. The reducer function for this example simply sums each of the 1s in the intermediate values.
In Example 2, the mapper function delivers the pairs [documentedID, word] for every word in the source input. The reducer function removes any duplicates and sorts the List of documentIDs from the intermediate values.
Sample code and a more expansive decription can be found at [Groovy MapReduce].
1 comment:
There is a small discussion on the Groovy Users of Minnesota discussion list.
http://groups.google.com/group/groovymn/browse_thread/thread/a46f04065c6285b9
Post a Comment