Sunday, March 16, 2014

Enhanced MapReduce for Top N items

In the last post we saw how to write a MapReduce program for finding the top-n items of a dataset.

The code in the mapper emits a pair key-value for every word found, passing the word as the key and 1 as the value. Since the book has roughly 38,000 words, this means that the information transmitted from mappers to reducers is proportional to that number. A way to improve network performance of this program is to rewrite the mapper as follows:
public static class TopNMapper extends Mapper {

        private Map<String, Integer> countMap = new HashMap<>();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String cleanLine = value.toString().toLowerCase().replaceAll("[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']", " ");
            StringTokenizer itr = new StringTokenizer(cleanLine);
            while (itr.hasMoreTokens()) {

                String word = itr.nextToken().trim();
                if (countMap.containsKey(word)) {
                    countMap.put(word, countMap.get(word)+1);
                }
                else {
                    countMap.put(word, 1);
                }
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

            for (String key: countMap.keySet()) {
                context.write(new Text(key), new IntWritable(countMap.get(key)));
            }
        }
    }
As we can see, we define an HashMap that uses words as the keys and the number of occurrences as the values; inside the loop, instead of emitting every word to the reducer, we put it into the map: if the word was already put, we increase its value, otherwise we set it to one. We also overrode the cleanup method, which is a method that Hadoop calls when the mapper has finished computing its input; in this method we now can emit the words to the reducers: doing this way, we can save a lot of network transmissions because we send to the reducers every word only once.

The complete code of this class is available on my github.
In the next post we'll see how to use combiners to leverage this approach.