Sunday, February 16, 2014

MapReduce job explained

In the last post we saw how to run a MapReduce job on Hadoop. Now we're going to analyze how a MapReduce program works. And, if you don't know what MapReduce is, the short answer is "MapReduce is a programming model for processing large data sets with a parallel, distributed algorithm on a cluster" (from Wikipedia).

Let's take a look at the source code: we can find a Java main method that is called from Hadoop, and two inner static classes, the mapper and the reducer. The code for the mapper is:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

       @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                        word.set(itr.nextToken());
                        context.write(word, one);
                }               
        }
}
As we can see, this class extends Mapper, which - as its JavaDoc says - maps input key/value pairs to a set of intermediate key/value pairs; when the job starts, the Hadoop framework passes to the mapper a chunk of data (a subset of the whole dataset) to process. The output of the mapper will be the input of the reducers (it's not the complete story, but we'll arrive there in another post). The Mapper uses Java generics to specify what kind of data will process; in this example, we use a class that extends Mapper and specifies Object and Text as the classes of key/value pairs in input, and Text and IntWritable as the classes of key/value pairs for the output to the reducers (we'll see the details of those classes in a moment).
Let's examine the code: there's only one overridden method, the map() that takes the key/value pair as arguments and the Hadoop context; every time this method is called by Hadoop, the method receives an offset of the file where the value is as the key, and a line of the text file we're reading as the value.
Hadoop has some basic types that ore optimized for network serialization; here is a table with a few of them:

Java typeHadoop type
IntegerIntWritable
LongLongWritable
DoubleDoubleWritable
StringTextWritable
MapMapWritable
ArrayArrayWritable

Now it's easy to understand what this method does: for every line of the book it receives, it uses a StringTokenizer to split the line into every single word; then it sets the word in the Text object and maps it the the value of 1; then writes it to the mappers via the Hadoop context.

Let's now look at the reducer:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

       @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                        sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
        }
}
This time we have the first two arguments of the overridden method reduce that are the same type of the last two of the TokenizerMapper class; that's because - as we said - the mapper outputs the data that the reducer will use as an input. The Hadoop framework takes care of calling this method for every key that comes from the mappers; as we saw before, the keys are the words of the file we're counting the words of.
The reduce method now has to sum all the occurrences of every single word, so it initializes a sum variable to 0 and then loops over all the values for that specific key that it receives from the mappers. For every word it updates the sum variable with the value mapped to that key. At the end of the loop, when all the occurrences of that word are counted, the method sets the value obtained into an IntWritable object and gives it to the Hadoop context to be outputted to the user.

We're now at the main method of the class, which is the one that is called by Hadoop when it's executed as a JAR file.
public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
                System.err.println("Usage: wordcount <in> <out>");
                System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
}

In the method, we first setup a Configuration object, then we check for the number of arguments passed to it; If the number of arguments is correct, we create a Job object and we set a few values for making it work. Let's dive into the details:
  • setJarByClass: sets the Jar by finding where a given class came from; this needs an explanation: Hadoop distributes the code to execute to the cluster as a JAR file; instead of specifying the name of the JAR, we tell Hadoop the name of the class that every instance on the cluster has to look for inside its classpath
  • setMapperClass: sets the class that will be executed as the mapper
  • setCombinerClass: sets the class that will be executed as the combiner (we'll explain what is a combiner in a future post)
  • setReducerClass: sets the class that will be executed as the reducer
  • setOutputKeyClass: sets the class that will be used as the key for outputting data to the user
  • setOutputValueClass: sets the class that will be used as the value for outputting data to the user
Then we say to Hadoop where it can find the input with the FileInputFormat.addInputPath() method and where it has to write the output with the FileOutputFormat.setOutputPath() method. The last method call is the waitForCompletion(), that submits the job to the cluster and waits for it to finish.

Now that the mechanism of a MapReduce job is more clear, we can start playing with it.