Sunday, April 6, 2014

Computing mean with MapReduce

In this post we'll see how to compute the mean of the max temperatures of every month for the city of Milan.
The temperature data is taken from http://archivio-meteo.distile.it/tabelle-dati-archivio-meteo/, but since the data are shown in tabular form, we had to sniff the HTTP conversation to see that the data come from this URL and are in JSON format.
Using Jackson, we could transform this JSON into a format simpler to use with Hadoop: CSV. The result of conversion is this:
01012000,-4.0,5.0
02012000,-5.0,5.1
03012000,-5.0,7.7
04012000,-3.0,9.7
...
If you're curious to see how we transformed it, take a look at the source code.

Let's look at the mapper class for this job:
public static class MeanMapper extends Mapper<Object, Text, Text, SumCount> {

    private final int DATE = 0;
    private final int MIN = 1;
    private final int MAX = 2;

    private Map<Text, List<Double>> maxMap = new HashMap<>();
 
    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        // gets the fields of the CSV line
        String[] values = value.toString().split((","));

        // defensive check
        if (values.length != 3) {
            return;
        }

        // gets date and max temperature
        String date = values[DATE];
        Text month = new Text(date.substring(2));
        Double max = Double.parseDouble(values[MAX]);

        // if not present, put this month into the map
        if (!maxMap.containsKey(month)) {
            maxMap.put(month, new ArrayList<Double>());
        }

        // adds the max temperature for this day to the list of temperatures
        maxMap.get(month).add(max);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

        // loops over the months collected in the map() method
        for (Text month: maxMap.keySet()) {

            List<Double> temperatures = maxMap.get(month);

            // computes the sum of the max temperatures for this month
            Double sum = 0d;
            for (Double max: temperatures) {
                sum += max;
            }

            // emits the month as the key and a SumCount as the value
            context.write(month, new SumCount(sum, temperatures.size()));
        }
    }
}
How we've seen in the last posts (about optimization and combiners), in the mapper we first put values into a map, and when the input is over, we loop over the keys to sum the values and to emit them. Note that we use the SumCount class, which is a utility class that wraps the two values we need to compute a mean: the sum of all the values and the number of values.
A common error in this kind of computation is making the mapper directly emit the mean; let's see what it can happen if we suppose to have a dataset like this:
01012000,0,10.0
02012000,0,20.0
03012000,0,2.0
04012000,0,4.0
05012000,0,3.0
and two mappers, which will receive the first two and the last three lines respectively. The first mapper will compute a mean of 15.0, given from (10.0 + 20.0) / 2. The second will compute a mean of 3.0, given from (2.0 + 4.0 + 3.0) / 3. When the reducer receive this two values, it sums them together and divide by two, so that the mean will be: 9.0, given from (15.0 + 3.0) / 2. But the correct mean for the values in this example is 7.8, which is given from (10.0 + 20.0 + 4.0 + 2.0 + 3.0) / 5.
This error is due to the fact that any mapper can receive any number of lines, so the value it will emit is only a part of the information needed to compute a mean.

If instead of emitting the mean we emit the sum of the values and the number of values, we can overcome the problem. In the example we saw before, the first mapper will emit the pair (30.0, 2) and the second (9.0, 3); if we sum the values and divide it by the sum of the numbers, we obtain the right result.

Let's get back to our job and look at the reducer:
public static class MeanReducer extends Reducer {

    private Map sumCountMap = new HashMap<>();

    @Override
    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

        SumCount totalSumCount = new SumCount();

        // loops over all the SumCount objects received for this month (the "key" param)
        for (SumCount sumCount : values) {

            // sums all of them
            totalSumCount.addSumCount(sumCount);
        }

        // puts the resulting SumCount into a map
        sumCountMap.put(new Text(key), totalSumCount);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

        // loops over the months collected in the reduce() method
        for (Text month: sumCountMap.keySet()) {

            double sum = sumCountMap.get(month).getSum().get();
            int count = sumCountMap.get(month).getCount().get();

            // emits the month and the mean of the max temperatures for the month
            context.write(month, new DoubleWritable(sum/count));
        }
    }
}
The reducer is simpler because it has just to retrieve all the SumCount objects emitted from the reducers and add them together. After receiving the input, it loops over the map of the SumCount objects and emits the month and the mean.