Sunday, September 7, 2014

Combining datasets with MapReduce

While in the SQL-world is very easy combining two or more datasets - we just need to use the JOIN keyword - with MapReduce things becomes a little harder. Let's get into it.
Suppose we have two distinct datasets, one for users of a forum and the other for the posts in the forum (data is in TSV - Tab Separated Values - format).
Users dataset:
id   name  reputation
0102 alice 32
0511 bob   27
...
Posts dataset:
id      type      subject   body                                   userid
0028391 question  test      "Hi, what is.."                        0102
0073626 comment   bug       "Guys, I've found.."                   0511
0089234 comment   bug       "Nope, it's not that way.."            0734
0190347 answer    info      "In my opinion it's worth the time.."  1932
...
What we'd like to do is to combine the reputation of each user to the number of question he/she posted, to see if we can relate one to the other.

The main idea behind combining the two datasets is to leverage the shuffle and sort phase: this process groups together values with the same key, so if we define the user id as the key, we can send to the reducer both the user reputation and the number of his/her posts, because they're attached to the same key (the user id).
Let's see how.
We start with the mapper:
public static class JoinMapper extends Mapper {

        private final static IntWritable one = new IntWritable(1);

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            // gets filename of the input file for this record
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String filename = fileSplit.getPath().getName();

            // creates an array with all the fields of the row we're reading now
            String[] fields = value.toString().split(("\t"));

            // if we're reading the posts file
            if (filename.equals("forum_nodes_no_lf.tsv")) {
                // retrieves the author ID and the type of the post
                String type = fields[1];
                if (type.equals("question")) {
                     String authorId = fields[4];
                     context.write(new Text(authorId), one);
                }
            }
            // if we're reading the users file
            else {
                String authorId = fields[0];
                String reputation = fields[2];

                // we add two to the reputation, because we want the minimum value to be greater than 1,
                // not to be confused with the "one" passed by the other branch of the if
                int reputationValue = Integer.parseInt(reputation) + 2;
                context.write(new Text(authorId), new IntWritable(reputationValue));
            }
        }
    }
First of all, this code assumes that in the directory Hadoop in looking in for data, there are two files: the users file and the posts file; we use the FileSplit class to obtain which filename Hadoop is now reading: in this way we can know if we're dealing the users file or the posts file. Then, if is the posts file, things get a little trickier. For every user, we're passing to the reducer a "1" for every question he/she posted on the forum; since we want to pass also reputation of the user (that can be a "0" or a "1"), we have to be careful not to mix up the values. To do this, we add 2 to the reputation, so that, even if it is "0", the value passed to the reducer will be greater or equal to two. In this way, we know that when the reducer will receive a "1" it will be for counting a question posted on the forum, while when it will receive a value greater than "1", it will be the reputation of the user.
Let's now look at the reducer:
 public static class JoinReducer extends Reducer {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            int postsNumber = 0;
            int reputation = 0;
            String authorId = key.toString();

            for (IntWritable value : values) {

                int intValue = value.get();
                if (intValue == 1) {
                    postsNumber ++;
                }
                else {
                    // we subtract two for having the exact reputation value (see the mapper)
                    reputation = intValue -2;
                }
            }

            context.write(new Text(authorId), new Text(reputation + "\t" + postsNumber));
        }
    }
As stated before, the reducer will now receive two kinds of data: "1" if related to the number of posts of the user, and a value greater than one for the reputation. The code in in the reducer, checks exactly this: if receives a "1" increaes the number of posts of this user, otherwise sets his/her reputation. At the end of the method, we tell the reducer to output the authorId, his/her reputation and how many posts has posted on the forum:
userid  reputation  posts# 
0102    55          23
0511    05          11
0734    00          89
1932    19          32
...
and we're ready to analyze these data.

Wednesday, May 21, 2014

Implementing Writable interface of Hadoop

As we saw in the previous posts, Hadoop makes an heavy use of network transmissions for executing its jobs. As Doug Cutting (the creator of Hadoop) explaines in this post on the Lucene mailing list, java.io.Serializable is too heavy for Hadoop's needs and so a new interface has been developed: Writable. Every object you need to emit from mapper to reducers or as an output has to implement this interface in order to make Hadoop trasmit the data from/to the nodes in the cluster.

Hadoop comes with several wrappers around primitive types and widely used classes in Java:
Java primitive Writable implementation
boolean BooleanWritable
byte ByteWritable
short ShortWritable
int IntWritable
VIntWritable
float FloatWritable
long LongWritable
VLongWritable
double DoubleWritable


Java class Writable implementation
String Text
byte[] BytesWritable
Object ObjectWritable
null NullWritable


Java collection Writable implementation
array ArrayWritable
ArrayPrimitiveWritable
TwoDArrayWritable
Map MapWritable
SortedMap SortedMapWritable
enum EnumWritable

For example, if we need a mapper to emit a String, we need to use a Text object wrapped around the string we want to emit.

The interface Writable defines two methods:
  • public void write(DataOutput dataOutput) throws IOException
  • public void readFields(DataInput dataInput) throws IOException
The first method, write() is used for writing the data onto the stream, while the second method, readFields(), is used for reading data from the stream. The wrappers we saw above just send and receive their binary representation over a stream.
Since Hadoop needs also to sort data while in the shuffle-and-sort phase, it needs also the Comparable interface to be implemented, so it defines the WritableComparable interface which is an interface that implements both Writable and Comparable.
If we need to emit a custom object which has no default wrapper, we need to create a class that implements the WritableComparable interface. In the mean example we saw on this post, we used the SumCount class, which is a class that implements WritableComparable (the source code is available on github):
public class SumCount implements WritableComparable<SumCount> {

    DoubleWritable sum;
    IntWritable count;

    public SumCount() {
        set(new DoubleWritable(0), new IntWritable(0));
    }

    public SumCount(Double sum, Integer count) {
        set(new DoubleWritable(sum), new IntWritable(count));
    }

    public void set(DoubleWritable sum, IntWritable count) {
        this.sum = sum;
        this.count = count;
    }

    public DoubleWritable getSum() {
        return sum;
    }

    public IntWritable getCount() {
        return count;
    }

    public void addSumCount(SumCount sumCount) {
        set(new DoubleWritable(this.sum.get() + sumCount.getSum().get()), new IntWritable(this.count.get() + sumCount.getCount().get()));
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {

        sum.write(dataOutput);
        count.write(dataOutput);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {

        sum.readFields(dataInput);
        count.readFields(dataInput);
    }

    @Override
    public int compareTo(SumCount sumCount) {

        // compares the first of the two values
        int comparison = sum.compareTo(sumCount.sum);

         // if they're not equal, return the value of compareTo between the "sum" value
        if (comparison != 0) {
            return comparison;
        }

        // else return the value of compareTo between the "count" value
        return count.compareTo(sumCount.count);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        SumCount sumCount = (SumCount) o;

        return count.equals(sumCount.count) && sum.equals(sumCount.sum);
    }

    @Override
    public int hashCode() {
        int result = sum.hashCode();
        result = 31 * result + count.hashCode();
        return result;
    }
}
As we can see, it's very easy to code the two methods defined by the Writable interface: they just call the write() and readFields() method of the primitive types of the properties of SumCount class; it's important setting the properties in the same order in both read() and writeFields(), otherwise it will not work. The other methods of this class are the getters, setters and the methods needed by the Comparable interface, which should be nothing new to a Java developer.

Thursday, May 15, 2014

More about Hadoop combiners

Hadoop combiners are a very powerful tool to speed up our computations. We already saw what a combiner is in a previous post and we also have seen another form of optimization in this post. Let's put all together to get the broader idea.
The combiners are optimizations that can be used with Hadoop to make a local-reduction: the idea is to reduce the key-value pairs directly on the mapper, to avoid transmitting all of them to the reducers.
Let's get back to the Top20 example from the previous post, which finds the top 20 words most used in a text. The Hadoop output of this job is shown below:
...
Map input records=4239
Map output records=37817
Map output bytes=359621
Input split bytes=118
Combine input records=0
Combine output records=0
Reduce input groups=4987
Reduce shuffle bytes=435261
Reduce input records=37817
Reduce output records=20
...
As we can see in the lines highlighted in bold, without a combiner we have 4239 lines in input for the mappers and 37817 key-value pairs emitted (the number of different words of the text). Having defined no combiner, the input and output records of combiners are 0, and so the input records for the reducers are exactly those emitted by the mappers, 37817.

Let's define a simple combiner:
    public static class WordCountCombiner extends Reducer {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            // computes the number of occurrences of a single word
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
As we can see, the code has the same logic of the reducer, since its target is the same: reducing key/value pairs.
Running the job having set the combiner gives us this result:
...
Map input records=4239
Map output records=37817
Map output bytes=359621
Input split bytes=116
Combine input records=37817
Combine output records=20
Reduce input groups=20
Reduce shuffle bytes=194
Reduce input records=20
Reduce output records=20
...
Looking at the output from Hadoop, we see that now the combiner has 37817 input records: this means that the records emitted from the mappers were all sent to the combiners; the result of the combiners is of 20 records emitted, which is the number of records received by the reducers.
Wow, that's a great result! We avoided the transmission of a lot of data: just 20 records instead of 37817 that we had without the combiner.

But there's a big disadvantage using combiners: since is an optimization, Hadoop does not guarantee their execution. So, what can we do to ensure a reduction at the mapper-level? Simple: we can put the logic of the reducer inside the mapper!

This is exactly what we've done in the mapper of this post. This pattern is called "in-mapper combiner". The reduce part is started at mapper level, so that the key-value pairs sent to the reducers are minimized.
Let's see Hadoop output with this pattern (in-mapper combiner and without the stand-alone combiner):
...
Map input records=4239
Map output records=4987
Map output bytes=61522
Input split bytes=118
Combine input records=0
Combine output records=0
Reduce input groups=4987
Reduce shuffle bytes=71502
Reduce input records=4987
Reduce output records=20...
Compared to the execution of the other mapper (without combining), this mapper outputs only 4987 records instead of the 37817 that are emitted to the reducers. A big reduction, even if not as big as the one obtained with the stand-alone combiner.
And what happens if we decide to couple the in-mapper combiner pattern and the stand-alone combiner? Well, we've got the best of the two:
...
Map input records=4239
Map output records=4987
Map output bytes=61522
Input split bytes=116
Combine input records=4987
Combine output records=20
Reduce input groups=20
Reduce shuffle bytes=194
Reduce input records=20
Reduce output records=20
...
In this last case, we have the best performance because we're emitting from the mapper a reduced number of records, the combiners (if it's executed) reduce even more the size of the data to be emitted. The only downside of this approach I can think of is that it takes a lot of time to be coded.

Sunday, April 6, 2014

Computing mean with MapReduce

In this post we'll see how to compute the mean of the max temperatures of every month for the city of Milan.
The temperature data is taken from http://archivio-meteo.distile.it/tabelle-dati-archivio-meteo/, but since the data are shown in tabular form, we had to sniff the HTTP conversation to see that the data come from this URL and are in JSON format.
Using Jackson, we could transform this JSON into a format simpler to use with Hadoop: CSV. The result of conversion is this:
01012000,-4.0,5.0
02012000,-5.0,5.1
03012000,-5.0,7.7
04012000,-3.0,9.7
...
If you're curious to see how we transformed it, take a look at the source code.

Let's look at the mapper class for this job:
public static class MeanMapper extends Mapper<Object, Text, Text, SumCount> {

    private final int DATE = 0;
    private final int MIN = 1;
    private final int MAX = 2;

    private Map<Text, List<Double>> maxMap = new HashMap<>();
 
    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        // gets the fields of the CSV line
        String[] values = value.toString().split((","));

        // defensive check
        if (values.length != 3) {
            return;
        }

        // gets date and max temperature
        String date = values[DATE];
        Text month = new Text(date.substring(2));
        Double max = Double.parseDouble(values[MAX]);

        // if not present, put this month into the map
        if (!maxMap.containsKey(month)) {
            maxMap.put(month, new ArrayList<Double>());
        }

        // adds the max temperature for this day to the list of temperatures
        maxMap.get(month).add(max);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

        // loops over the months collected in the map() method
        for (Text month: maxMap.keySet()) {

            List<Double> temperatures = maxMap.get(month);

            // computes the sum of the max temperatures for this month
            Double sum = 0d;
            for (Double max: temperatures) {
                sum += max;
            }

            // emits the month as the key and a SumCount as the value
            context.write(month, new SumCount(sum, temperatures.size()));
        }
    }
}
How we've seen in the last posts (about optimization and combiners), in the mapper we first put values into a map, and when the input is over, we loop over the keys to sum the values and to emit them. Note that we use the SumCount class, which is a utility class that wraps the two values we need to compute a mean: the sum of all the values and the number of values.
A common error in this kind of computation is making the mapper directly emit the mean; let's see what it can happen if we suppose to have a dataset like this:
01012000,0,10.0
02012000,0,20.0
03012000,0,2.0
04012000,0,4.0
05012000,0,3.0
and two mappers, which will receive the first two and the last three lines respectively. The first mapper will compute a mean of 15.0, given from (10.0 + 20.0) / 2. The second will compute a mean of 3.0, given from (2.0 + 4.0 + 3.0) / 3. When the reducer receive this two values, it sums them together and divide by two, so that the mean will be: 9.0, given from (15.0 + 3.0) / 2. But the correct mean for the values in this example is 7.8, which is given from (10.0 + 20.0 + 4.0 + 2.0 + 3.0) / 5.
This error is due to the fact that any mapper can receive any number of lines, so the value it will emit is only a part of the information needed to compute a mean.

If instead of emitting the mean we emit the sum of the values and the number of values, we can overcome the problem. In the example we saw before, the first mapper will emit the pair (30.0, 2) and the second (9.0, 3); if we sum the values and divide it by the sum of the numbers, we obtain the right result.

Let's get back to our job and look at the reducer:
public static class MeanReducer extends Reducer {

    private Map sumCountMap = new HashMap<>();

    @Override
    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

        SumCount totalSumCount = new SumCount();

        // loops over all the SumCount objects received for this month (the "key" param)
        for (SumCount sumCount : values) {

            // sums all of them
            totalSumCount.addSumCount(sumCount);
        }

        // puts the resulting SumCount into a map
        sumCountMap.put(new Text(key), totalSumCount);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

        // loops over the months collected in the reduce() method
        for (Text month: sumCountMap.keySet()) {

            double sum = sumCountMap.get(month).getSum().get();
            int count = sumCountMap.get(month).getCount().get();

            // emits the month and the mean of the max temperatures for the month
            context.write(month, new DoubleWritable(sum/count));
        }
    }
}
The reducer is simpler because it has just to retrieve all the SumCount objects emitted from the reducers and add them together. After receiving the input, it loops over the map of the SumCount objects and emits the month and the mean.

Monday, March 31, 2014

Hadoop Combiners

In the last post and in the preceding one we saw how to write a MapReduce program for finding the top-n items of a data set. The difference between the two was that the first program (which we call basic) emitted to the reducers every single item read from input, while the second (which we call enhanced) made a partial computation and emitted only a subset of the input. The enhanced top-n optimizes network transmissions (the less the key-value pairs emitted, the less network is used for transmitting them from mapper to reducer) and reduces the number of keys shuffled and sorted; but this is obtained at the cost of rewriting of the mapper.

If we look at the code of the mapper of the enhanced top-n , we can see that it implements the idea behind the reducer: it uses a Map for making a partial count of the words and emits every word only once; looking at the reducer's code, we see that it implements the same idea. If we could execute the code of the reducer of the basic top-n after the mapper has run on every machine (with its subset of data), we would obtain exactly the same result than rewriting the mapper as in the enhanced. This is exactly what Hadoop combiners do: they're executed just after the mapper on every machine for improving performance. For telling Hadoop which class to use as a combiner, we can use the Job.setCombinerClass() method.

Caution: using the reducer as a combiner works only if the function we're computing is both commutative (a + b = b + a) and associative (a + (b + c) = (a + b) + c).
Let's make an example. Suppose we're analyzing the traffic of a website and we have an input file with the number of visits per day like this (YYYYMMDD value):
20140401 100
20140331 1000
20140330 1300
20140329 5100
20140328 1200
We want to find which is the day with the highest number of visits.
Let's say that we have two mappers; the first one receives the first three lines and the second receives the last two. If we write the mapper to emit every line, the reducer will evaluate something like this:
max(100, 1000, 1300, 5100, 1200) -> 5100
and the max is 5100.
If we use the reducer as a combiner, the reducer will evaluate something like this:
max( max(100, 1000, 1300), max(5100, 1200)) -> max( 1300, 5100) -> 5100
because each of the two mapper will evaluate locally the max function. In this case the result will be 5100 as well, since the function we're evaluating (the max function) is both commutative and associative.

Let's say that now we need to compute the average number of visits per day. If we write the mapper to emit every line of the input file, the reducer will evaluate this:
mean(100, 1000, 1300, 5100, 1200) -> 1740
which is 1740.
If we use the reducer as a combiner, the reducer will evaluate something like this:
mean( mean(100, 1000, 1300), mean(5100, 1200)) -> mean( 800, 3150) -> 1975
because each of the two mapper will evaluate locally the max function. In this case the result will be 1975, which is obviously wrong.

So, if we're computing a commutative and associative function and we want to improve the performance of our job, we can use our reducer as a combiner; if we want to improve performance but we're computing a function that is not commutative and associative, we have to rewrite the mapper or to write a new combiner from stratch.

Sunday, March 16, 2014

Enhanced MapReduce for Top N items

In the last post we saw how to write a MapReduce program for finding the top-n items of a dataset.

The code in the mapper emits a pair key-value for every word found, passing the word as the key and 1 as the value. Since the book has roughly 38,000 words, this means that the information transmitted from mappers to reducers is proportional to that number. A way to improve network performance of this program is to rewrite the mapper as follows:
public static class TopNMapper extends Mapper {

        private Map<String, Integer> countMap = new HashMap<>();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String cleanLine = value.toString().toLowerCase().replaceAll("[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']", " ");
            StringTokenizer itr = new StringTokenizer(cleanLine);
            while (itr.hasMoreTokens()) {

                String word = itr.nextToken().trim();
                if (countMap.containsKey(word)) {
                    countMap.put(word, countMap.get(word)+1);
                }
                else {
                    countMap.put(word, 1);
                }
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

            for (String key: countMap.keySet()) {
                context.write(new Text(key), new IntWritable(countMap.get(key)));
            }
        }
    }
As we can see, we define an HashMap that uses words as the keys and the number of occurrences as the values; inside the loop, instead of emitting every word to the reducer, we put it into the map: if the word was already put, we increase its value, otherwise we set it to one. We also overrode the cleanup method, which is a method that Hadoop calls when the mapper has finished computing its input; in this method we now can emit the words to the reducers: doing this way, we can save a lot of network transmissions because we send to the reducers every word only once.

The complete code of this class is available on my github.
In the next post we'll see how to use combiners to leverage this approach.

Saturday, March 8, 2014

MapReduce for Top N items

In this post we'll see how to count the top-n items of a dataset; we'll again use the flatland book we used in a previous post: in that example we used the WordCount program to count the occurrences of every single word forming the book; now we want to find which are the top-n words used in the book.

Let's start with the mapper:
public static class TopNMapper extends Mapper {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String cleanLine = value.toString().toLowerCase().replaceAll("[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']", " ");
            StringTokenizer itr = new StringTokenizer(cleanLine);
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken().trim());
                context.write(word, one);
            }
      }
}
The mapper is really straightforward : the TopNMapper class defines an IntWritable set to 1 and a Text object; its map() method, like in the previous post, splits every line of the book into an array of single words and send to the reducers every word with the value of 1.

The reducer is more interesting:
public static class TopNReducer extends Reducer {

        private Map countMap = new HashMap<>();

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            // computes the number of occurrences of a single word
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }

            // puts the number of occurrences of this word into the map.
            countMap.put(key, new IntWritable(sum));
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

            Map sortedMap = sortByValues(countMap);

            int counter = 0;
            for (Text key: sortedMap.keySet()) {
                if (counter ++ == 20) {
                    break;
                }
                context.write(key, sortedMap.get(key));
            }
      }
}
We override two methods: reduce() and cleanup(). Let's examine the reduce() method.
As we've seen in the mapper's code, the keys the reducer receive are every single word contained in the book; at the beginning of the method, we compute the sum of all the values received from the mappers for this key, which is the number of occurrences of this word inside the book; then we put the word and the number of occurrences into a HashMap. Note that we're not directly putting into the map the Text object that contains the word because that instance is reused many times by Hadoop for performance issues; instead, we put a new Text object based on the received one.

To output the top-n values, we have to compute the number of occurrences of every word, sort the words by the number of occurrences and then extract the first n. In the reduce() method we don't write any value to the output, because we can sort the words only after that we collect them all; the cleanup() method is called by Hadoop after the reducer has received all its data, so we override this method to be sure that our HashMap is filled up with all the words.
Let's look at the method: first we sort the HashMap by values (using code from this post); then we loop over the keyset and output the first 20 items.

The complete code is available on my github.

The output of the reducer gives us the 20 most used words in Flatland:
the 2286
of 1634
and 1098
to 1088
a 936
i 735
in 713
that 499
is 429
you 419
my 334
it 330
as 322
by 317
not 317
or 299
but 279
with 273
for 267
be 252
Predictably, the most used words in the book are articles, conjunctions, adjectives, prepositions and personal pronouns.

This MapReduce program is not very efficient: the mappers will transfer to the reducers a lot of data; every single word of the book will be emitted to reducers together with the number "1", causing a very high network load; the phase in which mappers send data to the reducers is called "Shuffle and sort" and is explained in more detail in the free chapter of the "Hadoop, the definitive guide" by Tom White.

In the next posts we'll see how to improve the performances of the Shuffle and sort phase.

Sunday, March 2, 2014

MapReduce patterns

After having modified and run a job in the last post, we can now examine which are the most frequent patterns we encounter in MapReduce programming.
Although there are many of them, I think that the most important ones are:
  • Summarization
  • Filtering
  • Structural
Let's examine them in detail.

Summarization
By summarization we mean all the jobs that perform numerical computation over a set of data, like:
  • indexing
  • mean (or other statistical functions) computation
  • min/max computation
  • count (we've seen the WordCount example)

Filtering
Filtering is the act of retrieving only a subset of a bigger dataset. Most used cases are retrieving all data belonging to a single user or the top-N elements (by some criteria) of the dataset. Another frequent use of filtering is for sampling a dataset: when we're dealing with a lot of data , is usually a good idea to subset the original data by choosing some elements randomly to verify the behaviour of our job.

Structural
When you need to operate on the structure of the data; most used case is a join on different data, like the ones we're used to on a RDBMS.

In the next posts, we'll see in more detail how to deal with these patterns.

Sunday, February 23, 2014

Creating a JAR for Hadoop

We've seen the internals of MapReduce in the last post. Now we can make a little change to the WordCount and create a JAR for being executed by Hadoop.
If we look at the result of the WordCount we ran before, the lines of the file are only split by space, and thus all other punctuation characters and symbols remain attached to the words and in some way "invalidate" the count. For example, we can see some of these wrong values:
KING 1
KING'S 1
KING. 5
King 6
King, 4
King. 2
King; 1
The word is always king but sometimes it appears in upper case, sometimes in lower case, or with a punctuation character after it. So we'd like to update the behaviour of the WordCount program to count all the occurrences of any word, aside from the punctuation and other symbols. In order to do so, we have to modify the code of the mapper, since it's here that we get the data from the file and split it.
If we look at the code of mapper:
public static class TokenizerMapper extends Mapper {

      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(value.toString());
          while (itr.hasMoreTokens()) {
              word.set(itr.nextToken());
              context.write(word, one);
          }
     }
}
we see that the StringTokenizer takes the line as the parameter; before doing that, we remove all the symbols from the line using a RegExp that maps each of these symbols into a space:
String cleanLine = value.toString().toLowerCase().replaceAll("[_|$#<>\\[\\]\\*/\\\\,;,.\\-:()?!\"']", " ");
that simply says "if you see any of these character _, |, $, #, <, >, [, ], *, /, \, ,, ;, ., -, :, ,(, ), ?, !, ", ' transform it into a space". All the backslashes are needed for correctly escaping the characters. Then we trim the token to avoid empty tokens:
itr.nextToken().trim()

So now the code looks like this (the updates to the original file are printed in bold):
public static class TokenizerMapper extends Mapper {

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String cleanLine = value.toString().toLowerCase().replaceAll("[_|$#<>\\^=\\[\\]\\*/\\\\,;,.\\-:()?!\"']", " ");
        StringTokenizer itr = new StringTokenizer(cleanLine);
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken().trim());
            context.write(word, one);
        }
    }
}
The complete code of the class is available on my github repository.

We now want to create the JAR to be executed; to do it, we have to go to the output directory of our wordcount file (the directory that contains the .class files) and create a manifest file named manifest_file that contains something like this:
Manifest-Version: 1.0
Main-Class: samples.wordcount.WordCount

in which we tell the JAR which class to execute at startup. More details in Java official documentation. Note that there's no need to add classpath info, because the JAR will be run by Hadoop that already has a classpath that contains all needed libraries.
Now we can launch the command:
$ jar cmf manifest_file wordcount.jar .
that creates a JAR named wordcount.jar that contains all classes starting from this directory (the . parameter) e using the content of manifest_file to create a Manifest.
We can run the batch as we saw before. Looking at the result file we can check that there are no more symbols and punctuation characters; the occurrences of the word king is now the sum of all occurrences we found before:
king 20
which is what we were looking for.

Sunday, February 16, 2014

MapReduce job explained

In the last post we saw how to run a MapReduce job on Hadoop. Now we're going to analyze how a MapReduce program works. And, if you don't know what MapReduce is, the short answer is "MapReduce is a programming model for processing large data sets with a parallel, distributed algorithm on a cluster" (from Wikipedia).

Let's take a look at the source code: we can find a Java main method that is called from Hadoop, and two inner static classes, the mapper and the reducer. The code for the mapper is:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

       @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                        word.set(itr.nextToken());
                        context.write(word, one);
                }               
        }
}
As we can see, this class extends Mapper, which - as its JavaDoc says - maps input key/value pairs to a set of intermediate key/value pairs; when the job starts, the Hadoop framework passes to the mapper a chunk of data (a subset of the whole dataset) to process. The output of the mapper will be the input of the reducers (it's not the complete story, but we'll arrive there in another post). The Mapper uses Java generics to specify what kind of data will process; in this example, we use a class that extends Mapper and specifies Object and Text as the classes of key/value pairs in input, and Text and IntWritable as the classes of key/value pairs for the output to the reducers (we'll see the details of those classes in a moment).
Let's examine the code: there's only one overridden method, the map() that takes the key/value pair as arguments and the Hadoop context; every time this method is called by Hadoop, the method receives an offset of the file where the value is as the key, and a line of the text file we're reading as the value.
Hadoop has some basic types that ore optimized for network serialization; here is a table with a few of them:

Java typeHadoop type
IntegerIntWritable
LongLongWritable
DoubleDoubleWritable
StringTextWritable
MapMapWritable
ArrayArrayWritable

Now it's easy to understand what this method does: for every line of the book it receives, it uses a StringTokenizer to split the line into every single word; then it sets the word in the Text object and maps it the the value of 1; then writes it to the mappers via the Hadoop context.

Let's now look at the reducer:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

       @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                        sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
        }
}
This time we have the first two arguments of the overridden method reduce that are the same type of the last two of the TokenizerMapper class; that's because - as we said - the mapper outputs the data that the reducer will use as an input. The Hadoop framework takes care of calling this method for every key that comes from the mappers; as we saw before, the keys are the words of the file we're counting the words of.
The reduce method now has to sum all the occurrences of every single word, so it initializes a sum variable to 0 and then loops over all the values for that specific key that it receives from the mappers. For every word it updates the sum variable with the value mapped to that key. At the end of the loop, when all the occurrences of that word are counted, the method sets the value obtained into an IntWritable object and gives it to the Hadoop context to be outputted to the user.

We're now at the main method of the class, which is the one that is called by Hadoop when it's executed as a JAR file.
public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
                System.err.println("Usage: wordcount <in> <out>");
                System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
}

In the method, we first setup a Configuration object, then we check for the number of arguments passed to it; If the number of arguments is correct, we create a Job object and we set a few values for making it work. Let's dive into the details:
  • setJarByClass: sets the Jar by finding where a given class came from; this needs an explanation: Hadoop distributes the code to execute to the cluster as a JAR file; instead of specifying the name of the JAR, we tell Hadoop the name of the class that every instance on the cluster has to look for inside its classpath
  • setMapperClass: sets the class that will be executed as the mapper
  • setCombinerClass: sets the class that will be executed as the combiner (we'll explain what is a combiner in a future post)
  • setReducerClass: sets the class that will be executed as the reducer
  • setOutputKeyClass: sets the class that will be used as the key for outputting data to the user
  • setOutputValueClass: sets the class that will be used as the value for outputting data to the user
Then we say to Hadoop where it can find the input with the FileInputFormat.addInputPath() method and where it has to write the output with the FileOutputFormat.setOutputPath() method. The last method call is the waitForCompletion(), that submits the job to the cluster and waits for it to finish.

Now that the mechanism of a MapReduce job is more clear, we can start playing with it.

Tuesday, February 11, 2014

Running Hadoop Example

In the last post we've installed Hadoop 2.2.0 on Ubuntu. Now we'll see how to launch an example mapreduce task on Hadoop.

In the Hadoop directory (which you should find at /opt/hadoop/2.2.0) you can find a JAR containing some examples: the exact path is $HADOOP_COMMON_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar .
This JAR contains different examples of mapreduce programs. We'll launch the WordCount program, which is the equivalent of "Hello, world" for MapReduce. This programs just count the occurrences of every single word of the file given as the input.
To run this example we need to prepare something. We assume that we have the HDFS service running; if we didn't create a user directory, we have to do it now (assuming the hadoop user we're using is mapred):
$ hadoop fs -mkdir -p /user/mapred
When we pass "fs" as the first argument to the hadoop command, we're telling hadoop to work on HDFS filesystem; in this case, we used the mkdir command as a switch to create a new directory on HDFS.
Now that our user has a home directory, we can create a directory that we'll use lo load the input file for the mapreduce programs:
$ hadoop fs -mkdir inputdir
We can check the result issuing a "ls" command on HDFS:
$ hadoop fs -ls 
Found 1 items
drwxr-xr-x   - mapred mrusers        0 2014-02-11 22:54 inputdir
Now we can decide which file we'll count the words of; in this example, I'll use the text of the novella Flatland by Edwin Abbot, which is freely available on gutemberg project for download:
$ wget http://www.gutenberg.org/cache/epub/201/pg201.txt
Now we can put this file onto the HDFS, more precisely into the inputdir dir we created a moment ago:
$ hadoop fs -put pg201.txt inputdir
The switch "-put" tells Hadoop to get the file from the machine's file system and to put it onto the HDFS filesystem. We can check that the file is really there:
$ hadoop fs -ls inputdir
Found 1 items
drwxr-xr-x   - mapred mrusers        227368 2014-02-11 22:59 inputdir/pg201.txt

Now we're ready to execute the MapReduce program. Hadoop tarball comes with a JAR containing the WordCount example; we can launch Hadoop with these parameters:
  • jar: we're telling Hadoop we want to execute a mapreduce program contained in a JAR
  • /opt/hadoop-2.2.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar: this is the absolute path and filename of the JAR
  • wordcount: tells Hadoop which of the many examples contained in the JAR to run
  • inputdir: the directory on HDFS in which Hadoop can find the input file(s)
  • outputdir: the directory on HDFS in which Hadoop must write the result of the program
$ hadoop jar /opt/hadoop-2.2.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar wordcount inputdir outputdir
and the output is:
14/02/11 23:16:19 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/02/11 23:16:20 INFO input.FileInputFormat: Total input paths to process : 1
14/02/11 23:16:20 INFO mapreduce.JobSubmitter: number of splits:1
14/02/11 23:16:21 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
14/02/11 23:16:21 INFO Configuration.deprecation: mapreduce.combine.class is deprecated. Instead, use mapreduce.job.combine.class
14/02/11 23:16:21 INFO Configuration.deprecation: mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
14/02/11 23:16:21 INFO Configuration.deprecation: mapreduce.reduce.class is deprecated. Instead, use mapreduce.job.reduce.class
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
14/02/11 23:16:21 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
14/02/11 23:16:21 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1392155226604_0001
14/02/11 23:16:22 INFO impl.YarnClientImpl: Submitted application application_1392155226604_0001 to ResourceManager at /0.0.0.0:8032
14/02/11 23:16:23 INFO mapreduce.Job: The url to track the job: http://hadoop-VirtualBox:8088/proxy/application_1392155226604_0001/
14/02/11 23:16:23 INFO mapreduce.Job: Running job: job_1392155226604_0001
14/02/11 23:16:38 INFO mapreduce.Job: Job job_1392155226604_0001 running in uber mode : false
14/02/11 23:16:38 INFO mapreduce.Job:  map 0% reduce 0%
14/02/11 23:16:47 INFO mapreduce.Job:  map 100% reduce 0%
14/02/11 23:16:57 INFO mapreduce.Job:  map 100% reduce 100%
14/02/11 23:16:58 INFO mapreduce.Job: Job job_1392155226604_0001 completed successfully
14/02/11 23:16:58 INFO mapreduce.Job: Counters: 43
 File System Counters
  FILE: Number of bytes read=121375
  FILE: Number of bytes written=401139
  FILE: Number of read operations=0
  FILE: Number of large read operations=0
  FILE: Number of write operations=0
  HDFS: Number of bytes read=227485
  HDFS: Number of bytes written=88461
  HDFS: Number of read operations=6
  HDFS: Number of large read operations=0
  HDFS: Number of write operations=2
 Job Counters 
  Launched map tasks=1
  Launched reduce tasks=1
  Data-local map tasks=1
  Total time spent by all maps in occupied slots (ms)=7693
  Total time spent by all reduces in occupied slots (ms)=7383
 Map-Reduce Framework
  Map input records=4239
  Map output records=37680
  Map output bytes=366902
  Map output materialized bytes=121375
  Input split bytes=117
  Combine input records=37680
  Combine output records=8341
  Reduce input groups=8341
  Reduce shuffle bytes=121375
  Reduce input records=8341
  Reduce output records=8341
  Spilled Records=16682
  Shuffled Maps =1
  Failed Shuffles=0
  Merged Map outputs=1
  GC time elapsed (ms)=150
  CPU time spent (ms)=5490
  Physical memory (bytes) snapshot=399077376
  Virtual memory (bytes) snapshot=1674149888
  Total committed heap usage (bytes)=314048512
 Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0
 File Input Format Counters 
  Bytes Read=227368
 File Output Format Counters 
  Bytes Written=88461
The last part of the output is a summary of the execution of the mapreduce program; just before this, we can spot the "Job job_1392155226604_0001 completed successfully" line, which tells us the mapreduce program has been executed successfully. As told, Hadoop wrote the output onto the outputdir on HDFS; let's see what's inside this dir:
$ hadoop fs -ls outputdir
Found 2 items
-rw-r--r--   1 mapred mrusers          0 2014-02-11 23:16 outputdir/_SUCCESS
-rw-r--r--   1 mapred mrusers      88461 2014-02-11 23:16 outputdir/part-r-00000
The presence of the _SUCCESS file confirms us the successful execution of the job; in the part-r-00000 Hadoop wrote the result of the execution. We can bring the file up to the filesystem of our machine using the "get" switch:
$ hadoop fs -get outputdir/part-r-00000 .
Now we can see the content of the file (this is a small subset of the whole file):
...
leading 2
leagues 1
leaning 1
leap    1
leaped  1
learn   7
learned 1
least   23
least.  1
leave   3
leaves  3
leaving 2
lecture 1
led     4
left    9
...
The wordcount program just count the occurrences of every single word and outputs it.
Well, we've successfully run our first mapreduce job on our Hadoop installation!

Friday, January 24, 2014

Scraping data from web pages in R with XML package

In the last years a lot of data has been released publicly in different formats, but sometimes the data we're interested in are still inside the HTML of a web page: let's see how to get those data.

One of the existing packages for doing this job is the XML package. This package allows us to read and create XML and HTML documents; among the many features, there's a function called readHTMLTable() that analyze the parsed HTML and returns the tables present in the page. The details of the package are available in the official documentation of the package.

Let's start.
Suppose we're interested in the italian demographic info present in this page http://sdw.ecb.europa.eu/browse.do?node=2120803 from the EU website. We start loading and parsing the page:
page <- "http://sdw.ecb.europa.eu/browse.do?node=2120803"
parsed <- htmlParse(page)
Now that we have parsed HTML, we can use the readHTMLTable() function to return a list with all the tables present in the page; we'll call the function with these parameters:
  • parsed: the parsed HTML
  • skip.rows: the rows we want to skip (at the beginning of this table there are a couple of rows that don't contain data but just formatting elements)
  • colClasses: the datatype of the different columns of the table (in our case all the columns have integer values); the rep() function is used to replicate 31 times the "integer" value
table <- readHTMLTable(parsed, skip.rows=c(1,3,4,5), colClasses = c(rep("integer", 31)))
As we can see from the page source code, this web page contains six HTML tables; the one that contains the data we're interested in is the fifth, so we extract that one from the list of tables, as a data frame:
values <- as.data.frame(table[5])
Just for convenience, we rename the columns with the period and italian data:
# renames the columns for the period and Italy
colnames(values)[1] <- 'Period'
colnames(values)[19] <- 'Italy'
The italian data lasts from 1990 to 2014, so we have to subset only those rows and, of course, only the two columns of period and italian data:
# subsets the data: we are interested only in the first and the 19th column (period and italian info)
ids <- values[c(1,19)]

# Italy has only 25 years of info, so we cut away the other rows
ids <- as.data.frame(ids[1:25,])
Now we can plot these data calling the plot function with these parameters:
  • ids: the data to plot
  • xlab: the label of the X axis
  • ylab: the label of the Y axis
  • main: the title of the plot
  • pch: the symbol to draw for evey point (19 is a solid circle: look here for an overview)
  • cex: the size of the symbol
plot(ids, xlab="Year", ylab="Population in thousands", main="Population 1990-2014", pch=19, cex=0.5)
and here is the result:

Here's the full code, also available on my github:
library(XML)

# sets the URL
url <- "http://sdw.ecb.europa.eu/browse.do?node=2120803"

# let the XML library parse the HTMl of the page
parsed <- htmlParse(url)

# reads the HTML table present inside the page, paying attention
# to the data types contained in the HTML table
table <- readHTMLTable(parsed, skip.rows=c(1,3,4,5), colClasses = c(rep("integer", 31) ))

# this web page contains seven HTML pages, but the one that contains the data
# is the fifth
values <- as.data.frame(table[5])

# renames the columns for the period and Italy
colnames(values)[1] <- 'Period'
colnames(values)[19] <- 'Italy'

# now subsets the data: we are interested only in the first and 
# the 19th column (period and Italy info)
ids <- values[c(1,19)]

# Italy has only 25 year of info, so we cut away the others
ids <- as.data.frame(ids[1:25,])

# plots the data
plot(ids, xlab="Year", ylab="Population in thousands", main="Population 1990-2014", pch=19, cex=0.5)

Wednesday, January 15, 2014

Creating a choropleth map with R

A choropleth map is a thematic map in which areas are shaded or patterned in proportion to the measurement of the statistical variable being displayed on the map (see the full article on Wikipedia).
In this post we'll see how to create a choropleth map with R programming language. We'll draw a map of Italy and the data about unemployment based on its administrative regions. First of all, we need the administrative borders of the country: the italian national statistics institute (ISTAT) has made available these data on the page: http://www3.istat.it/dati/catalogo/20061102_00/; the data we're interested in are in form of a zipped file that contains the shapefiles . The shapefile format is a geospatial data format that can describe polygons, and our administrative regions are polygons. We can use the readShapeSpatial() function from the maptools package:
regions <- readShapeSpatial("data/prov2011_g.shp")
This function creates an object that contains several information, included the polygons we need to draw.
We now need the unemployment data; again from ISTAT we can download some unemployment data based on administrative regions: is a zipped file containing a CSV. This file contains the number on unemployed people for each region from 2004 to 2012): we'll use the 2012 data. Let's load the file and remove the data we don't need:
# reads the unemployment data
unemployment <- read.csv(file="data/unemployment.csv")

# selects only the data we are interested (the name and the year 2012)
unemployment <- subset(unemployment, select=c(1,10))
To create a choropleth we have to plot the areas corresponding to the different administrative regions in different colors, and to do that, we need to add to the shapefile the values of unemployment.
Since is not possible to merge data into a shapefile, we need to add a whole column containing the unemployment values to it. Be careful: the regions in the shapefile have an order, and the value's column to add must refer to the regions in the same order! To do that, we've to add to the shapefile object the unemployment values ordered as the regions are:
# adds the unemployment values to the shapefile matching on regions' names (the column name is NOME_PRO)
regions@data = data.frame(regions@data, unemployment[match(regions@data$NOME_PRO, unemployment$NOME_PRO),])
We're almost done: we have the borders of every region and the value associated to it, so we can draw the choropleth. We'll use the spplot() function of sp package; this function can be used to plot a spatial area using data from a shapefile, and we can tell this function to use a different color for every region, according to the magnitude of the unemployment data; the parameters are:
  • regions: the shapefile
  • "Year.2012": the column name of the unemployment data (used by spplot for choosing a color)
  • col.regions: here we tell spplot the palette to use for coloring the regions
  • main: the title of the plot
In this plot we'll use a gradient of gray color that go from 0.9 to 0.2 (from very dark grey to very light gray):
# plots it
spplot(regions, "Year.2012", col.regions=gray.colors(32, 0.9, 0.2), main="Unemployment in 2012")
We then add a legend:
grid.text("Number of unemployed people in thousands", x=unit(0.95, "npc"), y=unit(0.50, "npc"), rot=90,  gp = gpar(fontsize = 12))
and here is the result (click on the image to enlarge it):
Looking at the choropleth map, besides a couple of outliers located in the major cities, the values of unemployment seem quite the same overall the whole country; this visual effect is due to the outliers which "kick down" the other values to a smaller range of colors and thus giving the impression of almost equality.
Let's try to remove the outliers (setting them to 0), and see how it changes:
# removes the outliers
unemployment$Year.2012[which(unemployment$NOME_PRO == 'NAPOLI' | unemployment$NOME_PRO == 'TORINO' | unemployment$NOME_PRO == 'ROMA'| unemployment$NOME_PRO == 'MILANO')] <- 0

# reloads the shapefile
regions <- readShapeSpatial("data/prov2011_g.shp")

# adds the new unemployment data (without outliers)
regions@data = data.frame(regions@data, unemployment[match(regions@data$NOME_PRO, unemployment$NOME_PRO),])

# plots it, legend included
spplot(regions, "Year.2012", col.regions=gray.colors(32, 0.9, 0.2), main="Unemployment in 2012")
grid.text("Number of unemployed people in thousands (excluded Milan,Turin,Rome,Naples)", x=unit(0.95, "npc"), y=unit(0.50, "npc"), rot=90,  gp = gpar(fontsize = 12))
This is the result:
Now that there are no more outliers, the max value has passed from more than 200.000 to less than 100.000; the result is that we can see better the distribution of unemployed people in the other regions. But we're still not satisfied with these results, because we realized that we'd prefer to know about unemployment rate in Italy rather than visualizing absolute numbers, as it gives a better description of the phenomenon.
As we did before, we have to find some data. To compute the unemployment rate (having the absolute numbers) we just need the population for every region for the same year we have unemployment data for; then we can divide the unemployment number by the population to have the unemployment rate.

First we need to find the italian population per region; once again, the ISTAT has the data we need. The zipped file is here: http://demo.istat.it/pop2012/dati/province.zip; this archive contains a CSV with the population of every sub-region of any region, with males and females data separated: this means that we have to sum all the values belonging to the same region to have its population. Let's see how.
First of all, we load and clean the data:
# loads data from CSV
population <- read.csv(file="data/population_per_region.csv")

# leaves only the three columns we're interested in (name, total_males, total_females)
population <- subset(population, select=c(2,8,13))

# adds a "Total" column to the dataset with the sum of total_males and total_females
population$Total <- population$Totale_Maschi + population$Totale_Femmine

# capitalize the names of the regions
population$Descrizione_Provincia <- toupper(population$Descrizione_Provincia)
Now, we have to aggregate all the rows according to the name of region, sum all the values in them, and assign the result to a data frame:
# now we group by "Descrizione_Provincia" (the region name) and sum the "Total" column
pop <- as.data.frame(aggregate(population$Total, by=list(population$Descrizione_Provincia), sum))

# renames the columns for clarity
colnames(pop)[1] <- "Descrizione_Provincia"
colnames(pop)[2] <- "Total"
We're now ready to compute the rate. The original unemployment numbers are in thousands, while actual population is not, so we have to multiply it by 1000 to have the correct result, and then the compute the rate dividing the two numbers (we multiply by 100 to have a percentage instead of a [0,1] value):
# computes the number of unemployed 
pop$unemployed <- unemployment$Year.2012[match(pop$Descrizione_Provincia, unemployment$NOME_PRO)]*1000

# computes the percentage of unemployed people related to total population
pop$rate <- pop$unemployed / pop$Total * 100
We have now the unemployment rate, we just need to update the shapefile with these new data:
# updates the shapefile with these new data
regions@data = data.frame(regions@data, pop[match(regions@data$NOME_PRO, pop$Descrizione_Provincia),])
and plot again:
# plots it
cols <- gray.colors(32, 0.9, 0.2)
spplot(regions, "rate", col.regions=cols, main="Unemployment in 2012")
grid.text("% of unemployed population", x=unit(0.95, "npc"), y=unit(0.50, "npc"), rot=90,  gp = gpar(fontsize = 14))
Here is the final result:
Finally, this choropleth gives us an idea of unemployment in Italy.


Here's the full code, also available on my github:
# you can also download all the datafiles from my github: 
https://github.com/andreaiacono/playingwithdata/tree/master/data

library(maptools) # for reading shapefiles
library(grid) # for adding a title to the legend of the plot

### loads data ###

# sets current directory
setwd("/home/andrea/Programming/code/R/PlayingWithData/")

# reads the shapefile of administrative regions
regions <- readShapeSpatial("data/prov2011_g.shp")

# reads the unemployment data
unemployment <- read.csv(file="data/unemployment.csv")

# selects only the data we are interested (the name and the year 2012)
unemployment <- subset(unemployment, select=c(1,10))

# adds the unemployment value to the shapefile
regions@data = data.frame(regions@data, unemployment[match(regions@data$NOME_PRO, unemployment$NOME_PRO),])

# plots it
spplot(regions, "Year.2012", col.regions=gray.colors(32, 0.9, 0.2), main="Unemployment in 2012")

# adds a title to the legend
grid.text("Number of unemployed people in thousands", x=unit(0.95, "npc"), y=unit(0.50, "npc"), rot=90,  gp = gpar(fontsize = 12))

# removes the outliers
unemployment$Year.2012[which(unemployment$NOME_PRO == 'NAPOLI' | unemployment$NOME_PRO == 'TORINO' | unemployment$NOME_PRO == 'ROMA'| unemployment$NOME_PRO == 'MILANO')] <- 0

# reloads the shapefile
regions <- readShapeSpatial("data/prov2011_g.shp")

# adds the new unemployment data (without outliers)
regions@data = data.frame(regions@data, unemployment[match(regions@data$NOME_PRO, unemployment$NOME_PRO),])

# plots it, legend included
spplot(regions, "Year.2012", col.regions=gray.colors(32, 0.9, 0.2), main="Unemployment in 2012")
grid.text("Number of unemployed people in thousands (excluded Milan,Turin,Rome,Naples)", x=unit(0.95, "npc"), y=unit(0.50, "npc"), rot=90,  gp = gpar(fontsize = 12))

# now we want the rate of unemployment instead of absolute number, so we load the population number for every region for having a rate

# loads data from CSV
population <- read.csv(file="data/population_per_region.csv")

# leaves only the three columns we're interested in (name, total_males, total_females)
population <- subset(population, select=c(2,8,13))

# adds a "Total" column to the dataset with the sum of total_males and total_females
population$Total <- population$Totale_Maschi + population$Totale_Femmine

# capitalize the names of the regions
population$Descrizione_Provincia <- toupper(population$Descrizione_Provincia)

# now we group by "Descrizione_Provincia" and sum the "Total" column
pop <- as.data.frame(aggregate(population$Total, by=list(population$Descrizione_Provincia), sum))

# rename the columns
colnames(pop)[1] <- "Descrizione_Provincia"
colnames(pop)[2] <- "Total"

# computes the number of unemployed 
pop$unemployed <- unemployment$Year.2012[match(pop$Descrizione_Provincia, unemployment$NOME_PRO)]*1000

# computes the rate of unemployed people related to total population
pop$rate <- pop$unemployed / pop$Total * 100

# updates the shapefile with these new data
regions@data = data.frame(regions@data, pop[match(regions@data$NOME_PRO, pop$Descrizione_Provincia),])

# and plots it
cols <- gray.colors(32, 0.9, 0.2)
spplot(regions, "rate", col.regions=cols, main="Unemployment in 2012")
grid.text("% of unemployed population", x=unit(0.95, "npc"), y=unit(0.50, "npc"), rot=90,  gp = gpar(fontsize = 14))

Thursday, January 9, 2014

How to draw data on a map with R

In this post we'll see how to use R language to plot some data on a map; the full code is available on my GitHub.
I've used data freely available from the italian ministry of education about the rate of school abandonment in Italy. It is available for download (in form of zipped files) from this page: http://archivio.pubblica.istruzione.it/scuola_in_chiaro/open_data/index.html.

First we need the data of the schools (including their geolocation). Once downloaded and unzipped on a PC, we can load it with R:
schools <- read.csv(file="data/schools.csv")
This dataset contains for each school a unique code, its complete address, the lat/lon and other info; since we want to display this data on a map, we remove rows that don't have lat/lon info:
schools <- schools[which(schools$LATITUDINE != "NA"),]
For the same reason we can forget all the columns except the unique code and the lat/info info; we create a new dataframe with only the columns we're interested in:
schools_geo <- data.frame(schools$codice_scuola, schools$LATITUDINE, schools$LONGITUDINE)
We now have a clean dataset for the italian schools.

The abandonment archive contains several files: the one we're interested in is "abbandoni.csv"; this dataset contains the unique code of the school, the abandonment rate of students in that school (in percentage) and other info. Let's load the dataset:
abandonments <- read.csv(file="data/abandonments.csv")
Looking at the data we see that some schools don't have the abandonment rate (the value is 'NA' instead of a number), so we'll exclude those schools from our dataset:
abandonments <- abandonments[which(abandonments$scuola != "NA"),]
Since the column names of the unique code of the schools are different in the two datastes, we'll rename one of the two to match the other (the name of the column is 'cod_scuola'):
colnames(schools_geo)[1] <- "cod_scuola"
Now we merge the two dataset so that we have on the same row the abandonment rate and the lat/lon info:
data <- merge(abandonments, schools_geo, by="cod_scuola")
If we draw on the map all the schools, even the ones that have an abandonment rate of 0, we'll have too many data; to make the visualization clearer, let's get rid the schools where the abandonment rate is equal to 0:
data <- data[which(data$scuola>0),]
The data is now ok. Let's setup the map. We'll use an R package called RgoogleMaps; we'll first define a rectangle of lat/lon coordinates for getting an image of Italy from Google Maps.
# setup of latitude and longitude centered on Italy
lat_c<-42.1
lon_c<-12.5

# gets a rectangle of coordinates
rectangle<-qbbox(lat = c(lat_c[1]+5, lat_c[1]-5), lon = c(lon_c[1]+5, lon_c[1]-5))

# retrieves a map from googlemaps
map<-GetMap.bbox(rectangle$lonR, rectangle$latR)
Now that we have the map, we can plot on it the abandonment rate; we will show the abandonment rate as a circle centered of the lat/lon coordinates of the school. The PlotOnStaticMap function of the RGoogleMaps package gets these parameter:
  • a map: the one we just have created
  • lat/lon: the coordinates where to plot
  • pch: what to plot; 1 means an empty circle
  • cex: the radius of the circle; we used the value of abandonment rate so that the bigger the rate, the bigger the circle
  • col: the color of plot
PlotOnStaticMap(MyMap=map, lat=data$schools.LATITUDINE, lon=data$schools.LONGITUDINE, TrueProj=TRUE, cex=data$scuola/15, pch=1, col="blue")
We also want a legend that describes the circles' values:
  • x,y: the top-left corner of the box
  • x.intersp: the horizontal distance between the circle and its description
  • y.intersp: the vertical distance between one row and the next
  • legend: the descriptions of the legend's items
  • col: the color of the circles
  • pch: the symbol to use (a circle)
  • cex: the font size of the text
  • pt.cex: the size of the circles
legend(x=120, y=300, title="Abandonment rate", x.intersp=2, y.intersp=c(1.1,1.1,1.1,1.1,1.15,1.2),legend=c("< 1%", "1% - 5%", "5% - 10%", "10% - 20%", "20% - 50%", "> 50%"), col="blue",pch=1,cex=0.8, pt.cex=c(1/15, 5/15, 10/15, 20/15, 50/15, 70/15)) 
This is the image returned (click on the image to enlarge it):
From this image, we can quickly see that the phenomenon of abandonment is more frequent in southern Italy and in the big cities (Milan, Turin, Genoa, Naples, Palermo) of the whole country.