Sunday, February 23, 2014

Creating a JAR for Hadoop

We've seen the internals of MapReduce in the last post. Now we can make a little change to the WordCount and create a JAR for being executed by Hadoop.
If we look at the result of the WordCount we ran before, the lines of the file are only split by space, and thus all other punctuation characters and symbols remain attached to the words and in some way "invalidate" the count. For example, we can see some of these wrong values:
KING 1
KING'S 1
KING. 5
King 6
King, 4
King. 2
King; 1
The word is always king but sometimes it appears in upper case, sometimes in lower case, or with a punctuation character after it. So we'd like to update the behaviour of the WordCount program to count all the occurrences of any word, aside from the punctuation and other symbols. In order to do so, we have to modify the code of the mapper, since it's here that we get the data from the file and split it.
If we look at the code of mapper:
public static class TokenizerMapper extends Mapper {

      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(value.toString());
          while (itr.hasMoreTokens()) {
              word.set(itr.nextToken());
              context.write(word, one);
          }
     }
}
we see that the StringTokenizer takes the line as the parameter; before doing that, we remove all the symbols from the line using a RegExp that maps each of these symbols into a space:
String cleanLine = value.toString().toLowerCase().replaceAll("[_|$#<>\\[\\]\\*/\\\\,;,.\\-:()?!\"']", " ");
that simply says "if you see any of these character _, |, $, #, <, >, [, ], *, /, \, ,, ;, ., -, :, ,(, ), ?, !, ", ' transform it into a space". All the backslashes are needed for correctly escaping the characters. Then we trim the token to avoid empty tokens:
itr.nextToken().trim()

So now the code looks like this (the updates to the original file are printed in bold):
public static class TokenizerMapper extends Mapper {

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String cleanLine = value.toString().toLowerCase().replaceAll("[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']", " ");
        StringTokenizer itr = new StringTokenizer(cleanLine);
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken().trim());
            context.write(word, one);
        }
    }
}
The complete code of the class is available on my github repository.

We now want to create the JAR to be executed; to do it, we have to go to the output directory of our wordcount file (the directory that contains the .class files) and create a manifest file named manifest_file that contains something like this:
Manifest-Version: 1.0
Main-Class: samples.wordcount.WordCount

in which we tell the JAR which class to execute at startup. More details in Java official documentation. Note that there's no need to add classpath info, because the JAR will be run by Hadoop that already has a classpath that contains all needed libraries.
Now we can launch the command:
$ jar cmf manifest_file wordcount.jar .
that creates a JAR named wordcount.jar that contains all classes starting from this directory (the . parameter) e using the content of manifest_file to create a Manifest.
We can run the batch as we saw before. Looking at the result file we can check that there are no more symbols and punctuation characters; the occurrences of the word king is now the sum of all occurrences we found before:
king 20
which is what we were looking for.

Sunday, February 16, 2014

MapReduce job explained

In the last post we saw how to run a MapReduce job on Hadoop. Now we're going to analyze how a MapReduce program works. And, if you don't know what MapReduce is, the short answer is "MapReduce is a programming model for processing large data sets with a parallel, distributed algorithm on a cluster" (from Wikipedia).

Let's take a look at the source code: we can find a Java main method that is called from Hadoop, and two inner static classes, the mapper and the reducer. The code for the mapper is:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

       @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                        word.set(itr.nextToken());
                        context.write(word, one);
                }               
        }
}
As we can see, this class extends Mapper, which - as its JavaDoc says - maps input key/value pairs to a set of intermediate key/value pairs; when the job starts, the Hadoop framework passes to the mapper a chunk of data (a subset of the whole dataset) to process. The output of the mapper will be the input of the reducers (it's not the complete story, but we'll arrive there in another post). The Mapper uses Java generics to specify what kind of data will process; in this example, we use a class that extends Mapper and specifies Object and Text as the classes of key/value pairs in input, and Text and IntWritable as the classes of key/value pairs for the output to the reducers (we'll see the details of those classes in a moment).
Let's examine the code: there's only one overridden method, the map() that takes the key/value pair as arguments and the Hadoop context; every time this method is called by Hadoop, the method receives an offset of the file where the value is as the key, and a line of the text file we're reading as the value.
Hadoop has some basic types that ore optimized for network serialization; here is a table with a few of them:

Java typeHadoop type
IntegerIntWritable
LongLongWritable
DoubleDoubleWritable
StringTextWritable
MapMapWritable
ArrayArrayWritable

Now it's easy to understand what this method does: for every line of the book it receives, it uses a StringTokenizer to split the line into every single word; then it sets the word in the Text object and maps it the the value of 1; then writes it to the mappers via the Hadoop context.

Let's now look at the reducer:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

       @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                        sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
        }
}
This time we have the first two arguments of the overridden method reduce that are the same type of the last two of the TokenizerMapper class; that's because - as we said - the mapper outputs the data that the reducer will use as an input. The Hadoop framework takes care of calling this method for every key that comes from the mappers; as we saw before, the keys are the words of the file we're counting the words of.
The reduce method now has to sum all the occurrences of every single word, so it initializes a sum variable to 0 and then loops over all the values for that specific key that it receives from the mappers. For every word it updates the sum variable with the value mapped to that key. At the end of the loop, when all the occurrences of that word are counted, the method sets the value obtained into an IntWritable object and gives it to the Hadoop context to be outputted to the user.

We're now at the main method of the class, which is the one that is called by Hadoop when it's executed as a JAR file.
public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
                System.err.println("Usage: wordcount <in> <out>");
                System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
}

In the method, we first setup a Configuration object, then we check for the number of arguments passed to it; If the number of arguments is correct, we create a Job object and we set a few values for making it work. Let's dive into the details:
  • setJarByClass: sets the Jar by finding where a given class came from; this needs an explanation: Hadoop distributes the code to execute to the cluster as a JAR file; instead of specifying the name of the JAR, we tell Hadoop the name of the class that every instance on the cluster has to look for inside its classpath
  • setMapperClass: sets the class that will be executed as the mapper
  • setCombinerClass: sets the class that will be executed as the combiner (we'll explain what is a combiner in a future post)
  • setReducerClass: sets the class that will be executed as the reducer
  • setOutputKeyClass: sets the class that will be used as the key for outputting data to the user
  • setOutputValueClass: sets the class that will be used as the value for outputting data to the user
Then we say to Hadoop where it can find the input with the FileInputFormat.addInputPath() method and where it has to write the output with the FileOutputFormat.setOutputPath() method. The last method call is the waitForCompletion(), that submits the job to the cluster and waits for it to finish.

Now that the mechanism of a MapReduce job is more clear, we can start playing with it.

Tuesday, February 11, 2014

Running Hadoop Example

In the last post we've installed Hadoop 2.2.0 on Ubuntu. Now we'll see how to launch an example mapreduce task on Hadoop.

In the Hadoop directory (which you should find at /opt/hadoop/2.2.0) you can find a JAR containing some examples: the exact path is $HADOOP_COMMON_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar .
This JAR contains different examples of mapreduce programs. We'll launch the WordCount program, which is the equivalent of "Hello, world" for MapReduce. This programs just count the occurrences of every single word of the file given as the input.
To run this example we need to prepare something. We assume that we have the HDFS service running; if we didn't create a user directory, we have to do it now (assuming the hadoop user we're using is mapred):
$ hadoop fs -mkdir -p /user/mapred
When we pass "fs" as the first argument to the hadoop command, we're telling hadoop to work on HDFS filesystem; in this case, we used the mkdir command as a switch to create a new directory on HDFS.
Now that our user has a home directory, we can create a directory that we'll use lo load the input file for the mapreduce programs:
$ hadoop fs -mkdir inputdir
We can check the result issuing a "ls" command on HDFS:
$ hadoop fs -ls 
Found 1 items
drwxr-xr-x   - mapred mrusers        0 2014-02-11 22:54 inputdir
Now we can decide which file we'll count the words of; in this example, I'll use the text of the novella Flatland by Edwin Abbot, which is freely available on gutemberg project for download:
$ wget http://www.gutenberg.org/cache/epub/201/pg201.txt
Now we can put this file onto the HDFS, more precisely into the inputdir dir we created a moment ago:
$ hadoop fs -put pg201.txt inputdir
The switch "-put" tells Hadoop to get the file from the machine's file system and to put it onto the HDFS filesystem. We can check that the file is really there:
$ hadoop fs -ls inputdir
Found 1 items
drwxr-xr-x   - mapred mrusers        227368 2014-02-11 22:59 inputdir/pg201.txt

Now we're ready to execute the MapReduce program. Hadoop tarball comes with a JAR containing the WordCount example; we can launch Hadoop with these parameters:
  • jar: we're telling Hadoop we want to execute a mapreduce program contained in a JAR
  • /opt/hadoop-2.2.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar: this is the absolute path and filename of the JAR
  • wordcount: tells Hadoop which of the many examples contained in the JAR to run
  • inputdir: the directory on HDFS in which Hadoop can find the input file(s)
  • outputdir: the directory on HDFS in which Hadoop must write the result of the program
$ hadoop jar /opt/hadoop-2.2.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar wordcount inputdir outputdir
and the output is:
14/02/11 23:16:19 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/02/11 23:16:20 INFO input.FileInputFormat: Total input paths to process : 1
14/02/11 23:16:20 INFO mapreduce.JobSubmitter: number of splits:1
14/02/11 23:16:21 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
14/02/11 23:16:21 INFO Configuration.deprecation: mapreduce.combine.class is deprecated. Instead, use mapreduce.job.combine.class
14/02/11 23:16:21 INFO Configuration.deprecation: mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
14/02/11 23:16:21 INFO Configuration.deprecation: mapreduce.reduce.class is deprecated. Instead, use mapreduce.job.reduce.class
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
14/02/11 23:16:21 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1392155226604_0001
14/02/11 23:16:22 INFO impl.YarnClientImpl: Submitted application application_1392155226604_0001 to ResourceManager at /0.0.0.0:8032
14/02/11 23:16:23 INFO mapreduce.Job: The url to track the job: http://hadoop-VirtualBox:8088/proxy/application_1392155226604_0001/
14/02/11 23:16:23 INFO mapreduce.Job: Running job: job_1392155226604_0001
14/02/11 23:16:38 INFO mapreduce.Job: Job job_1392155226604_0001 running in uber mode : false
14/02/11 23:16:38 INFO mapreduce.Job:  map 0% reduce 0%
14/02/11 23:16:47 INFO mapreduce.Job:  map 100% reduce 0%
14/02/11 23:16:57 INFO mapreduce.Job:  map 100% reduce 100%
14/02/11 23:16:58 INFO mapreduce.Job: Job job_1392155226604_0001 completed successfully
14/02/11 23:16:58 INFO mapreduce.Job: Counters: 43
 File System Counters
  FILE: Number of bytes read=121375
  FILE: Number of bytes written=401139
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=227485
  HDFS: Number of bytes written=88461
  HDFS: Number of read operations=6
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=2
 Job Counters 
  Launched map tasks=1
  Launched reduce tasks=1
  Data-local map tasks=1
  Total time spent by all maps in occupied slots (ms)=7693
  Total time spent by all reduces in occupied slots (ms)=7383
 Map-Reduce Framework
  Map input records=4239
  Map output records=37680
  Map output bytes=366902
  Map output materialized bytes=121375
  Input split bytes=117
  Combine input records=37680
  Combine output records=8341
  Reduce input groups=8341
  Reduce shuffle bytes=121375
  Reduce input records=8341
  Reduce output records=8341
  Spilled Records=16682
  Shuffled Maps =1
  Failed Shuffles=0
  Merged Map outputs=1
  GC time elapsed (ms)=150
  CPU time spent (ms)=5490
  Physical memory (bytes) snapshot=399077376
  Virtual memory (bytes) snapshot=1674149888
  Total committed heap usage (bytes)=314048512
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 File Input Format Counters 
  Bytes Read=227368
 File Output Format Counters 
  Bytes Written=88461
The last part of the output is a summary of the execution of the mapreduce program; just before this, we can spot the "Job job_1392155226604_0001 completed successfully" line, which tells us the mapreduce program has been executed successfully. As told, Hadoop wrote the output onto the outputdir on HDFS; let's see what's inside this dir:
$ hadoop fs -ls outputdir
Found 2 items
-rw-r--r--   1 mapred mrusers          0 2014-02-11 23:16 outputdir/_SUCCESS
-rw-r--r--   1 mapred mrusers      88461 2014-02-11 23:16 outputdir/part-r-00000
The presence of the _SUCCESS file confirms us the successful execution of the job; in the part-r-00000 Hadoop wrote the result of the execution. We can bring the file up to the filesystem of our machine using the "get" switch:
$ hadoop fs -get outputdir/part-r-00000 .
Now we can see the content of the file (this is a small subset of the whole file):
...
leading 2
leagues 1
leaning 1
leap    1
leaped  1
learn   7
learned 1
least   23
least.  1
leave   3
leaves  3
leaving 2
lecture 1
led     4
left    9
...
The wordcount program just count the occurrences of every single word and outputs it.
Well, we've successfully run our first mapreduce job on our Hadoop installation!