Monday, March 31, 2014

Hadoop Combiners

In the last post and in the preceding one we saw how to write a MapReduce program for finding the top-n items of a data set. The difference between the two was that the first program (which we call basic) emitted to the reducers every single item read from input, while the second (which we call enhanced) made a partial computation and emitted only a subset of the input. The enhanced top-n optimizes network transmissions (the less the key-value pairs emitted, the less network is used for transmitting them from mapper to reducer) and reduces the number of keys shuffled and sorted; but this is obtained at the cost of rewriting of the mapper.

If we look at the code of the mapper of the enhanced top-n , we can see that it implements the idea behind the reducer: it uses a Map for making a partial count of the words and emits every word only once; looking at the reducer's code, we see that it implements the same idea. If we could execute the code of the reducer of the basic top-n after the mapper has run on every machine (with its subset of data), we would obtain exactly the same result than rewriting the mapper as in the enhanced. This is exactly what Hadoop combiners do: they're executed just after the mapper on every machine for improving performance. For telling Hadoop which class to use as a combiner, we can use the Job.setCombinerClass() method.

Caution: using the reducer as a combiner works only if the function we're computing is both commutative (a + b = b + a) and associative (a + (b + c) = (a + b) + c).
Let's make an example. Suppose we're analyzing the traffic of a website and we have an input file with the number of visits per day like this (YYYYMMDD value):
20140401 100
20140331 1000
20140330 1300
20140329 5100
20140328 1200
We want to find which is the day with the highest number of visits.
Let's say that we have two mappers; the first one receives the first three lines and the second receives the last two. If we write the mapper to emit every line, the reducer will evaluate something like this:
max(100, 1000, 1300, 5100, 1200) -> 5100
and the max is 5100.
If we use the reducer as a combiner, the reducer will evaluate something like this:
max( max(100, 1000, 1300), max(5100, 1200)) -> max( 1300, 5100) -> 5100
because each of the two mapper will evaluate locally the max function. In this case the result will be 5100 as well, since the function we're evaluating (the max function) is both commutative and associative.

Let's say that now we need to compute the average number of visits per day. If we write the mapper to emit every line of the input file, the reducer will evaluate this:
mean(100, 1000, 1300, 5100, 1200) -> 1740
which is 1740.
If we use the reducer as a combiner, the reducer will evaluate something like this:
mean( mean(100, 1000, 1300), mean(5100, 1200)) -> mean( 800, 3150) -> 1975
because each of the two mapper will evaluate locally the max function. In this case the result will be 1975, which is obviously wrong.

So, if we're computing a commutative and associative function and we want to improve the performance of our job, we can use our reducer as a combiner; if we want to improve performance but we're computing a function that is not commutative and associative, we have to rewrite the mapper or to write a new combiner from stratch.

Sunday, March 16, 2014

Enhanced MapReduce for Top N items

In the last post we saw how to write a MapReduce program for finding the top-n items of a dataset.

The code in the mapper emits a pair key-value for every word found, passing the word as the key and 1 as the value. Since the book has roughly 38,000 words, this means that the information transmitted from mappers to reducers is proportional to that number. A way to improve network performance of this program is to rewrite the mapper as follows:
public static class TopNMapper extends Mapper {

        private Map<String, Integer> countMap = new HashMap<>();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String cleanLine = value.toString().toLowerCase().replaceAll("[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']", " ");
            StringTokenizer itr = new StringTokenizer(cleanLine);
            while (itr.hasMoreTokens()) {

                String word = itr.nextToken().trim();
                if (countMap.containsKey(word)) {
                    countMap.put(word, countMap.get(word)+1);
                }
                else {
                    countMap.put(word, 1);
                }
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

            for (String key: countMap.keySet()) {
                context.write(new Text(key), new IntWritable(countMap.get(key)));
            }
        }
    }
As we can see, we define an HashMap that uses words as the keys and the number of occurrences as the values; inside the loop, instead of emitting every word to the reducer, we put it into the map: if the word was already put, we increase its value, otherwise we set it to one. We also overrode the cleanup method, which is a method that Hadoop calls when the mapper has finished computing its input; in this method we now can emit the words to the reducers: doing this way, we can save a lot of network transmissions because we send to the reducers every word only once.

The complete code of this class is available on my github.
In the next post we'll see how to use combiners to leverage this approach.

Saturday, March 8, 2014

MapReduce for Top N items

In this post we'll see how to count the top-n items of a dataset; we'll again use the flatland book we used in a previous post: in that example we used the WordCount program to count the occurrences of every single word forming the book; now we want to find which are the top-n words used in the book.

Let's start with the mapper:
public static class TopNMapper extends Mapper {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String cleanLine = value.toString().toLowerCase().replaceAll("[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']", " ");
            StringTokenizer itr = new StringTokenizer(cleanLine);
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken().trim());
                context.write(word, one);
            }
      }
}
The mapper is really straightforward : the TopNMapper class defines an IntWritable set to 1 and a Text object; its map() method, like in the previous post, splits every line of the book into an array of single words and send to the reducers every word with the value of 1.

The reducer is more interesting:
public static class TopNReducer extends Reducer {

        private Map countMap = new HashMap<>();

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            // computes the number of occurrences of a single word
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }

            // puts the number of occurrences of this word into the map.
            countMap.put(key, new IntWritable(sum));
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

            Map sortedMap = sortByValues(countMap);

            int counter = 0;
            for (Text key: sortedMap.keySet()) {
                if (counter ++ == 20) {
                    break;
                }
                context.write(key, sortedMap.get(key));
            }
      }
}
We override two methods: reduce() and cleanup(). Let's examine the reduce() method.
As we've seen in the mapper's code, the keys the reducer receive are every single word contained in the book; at the beginning of the method, we compute the sum of all the values received from the mappers for this key, which is the number of occurrences of this word inside the book; then we put the word and the number of occurrences into a HashMap. Note that we're not directly putting into the map the Text object that contains the word because that instance is reused many times by Hadoop for performance issues; instead, we put a new Text object based on the received one.

To output the top-n values, we have to compute the number of occurrences of every word, sort the words by the number of occurrences and then extract the first n. In the reduce() method we don't write any value to the output, because we can sort the words only after that we collect them all; the cleanup() method is called by Hadoop after the reducer has received all its data, so we override this method to be sure that our HashMap is filled up with all the words.
Let's look at the method: first we sort the HashMap by values (using code from this post); then we loop over the keyset and output the first 20 items.

The complete code is available on my github.

The output of the reducer gives us the 20 most used words in Flatland:
the 2286
of 1634
and 1098
to 1088
a 936
i 735
in 713
that 499
is 429
you 419
my 334
it 330
as 322
by 317
not 317
or 299
but 279
with 273
for 267
be 252
Predictably, the most used words in the book are articles, conjunctions, adjectives, prepositions and personal pronouns.

This MapReduce program is not very efficient: the mappers will transfer to the reducers a lot of data; every single word of the book will be emitted to reducers together with the number "1", causing a very high network load; the phase in which mappers send data to the reducers is called "Shuffle and sort" and is explained in more detail in the free chapter of the "Hadoop, the definitive guide" by Tom White.

In the next posts we'll see how to improve the performances of the Shuffle and sort phase.

Sunday, March 2, 2014

MapReduce patterns

After having modified and run a job in the last post, we can now examine which are the most frequent patterns we encounter in MapReduce programming.
Although there are many of them, I think that the most important ones are:
  • Summarization
  • Filtering
  • Structural
Let's examine them in detail.

Summarization
By summarization we mean all the jobs that perform numerical computation over a set of data, like:
  • indexing
  • mean (or other statistical functions) computation
  • min/max computation
  • count (we've seen the WordCount example)

Filtering
Filtering is the act of retrieving only a subset of a bigger dataset. Most used cases are retrieving all data belonging to a single user or the top-N elements (by some criteria) of the dataset. Another frequent use of filtering is for sampling a dataset: when we're dealing with a lot of data , is usually a good idea to subset the original data by choosing some elements randomly to verify the behaviour of our job.

Structural
When you need to operate on the structure of the data; most used case is a join on different data, like the ones we're used to on a RDBMS.

In the next posts, we'll see in more detail how to deal with these patterns.