The temperature data is taken from http://archivio-meteo.distile.it/tabelle-dati-archivio-meteo/, but since the data are shown in tabular form, we had to sniff the HTTP conversation to see that the data come from this URL and are in JSON format.
Using Jackson, we could transform this JSON into a format simpler to use with Hadoop: CSV. The result of conversion is this:
01012000,-4.0,5.0 02012000,-5.0,5.1 03012000,-5.0,7.7 04012000,-3.0,9.7 ...If you're curious to see how we transformed it, take a look at the source code.
Let's look at the mapper class for this job:
public static class MeanMapper extends Mapper<Object, Text, Text, SumCount> { private final int DATE = 0; private final int MIN = 1; private final int MAX = 2; private Map<Text, List<Double>> maxMap = new HashMap<>(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // gets the fields of the CSV line String[] values = value.toString().split((",")); // defensive check if (values.length != 3) { return; } // gets date and max temperature String date = values[DATE]; Text month = new Text(date.substring(2)); Double max = Double.parseDouble(values[MAX]); // if not present, put this month into the map if (!maxMap.containsKey(month)) { maxMap.put(month, new ArrayList<Double>()); } // adds the max temperature for this day to the list of temperatures maxMap.get(month).add(max); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // loops over the months collected in the map() method for (Text month: maxMap.keySet()) { List<Double> temperatures = maxMap.get(month); // computes the sum of the max temperatures for this month Double sum = 0d; for (Double max: temperatures) { sum += max; } // emits the month as the key and a SumCount as the value context.write(month, new SumCount(sum, temperatures.size())); } } }How we've seen in the last posts (about optimization and combiners), in the mapper we first put values into a map, and when the input is over, we loop over the keys to sum the values and to emit them. Note that we use the SumCount class, which is a utility class that wraps the two values we need to compute a mean: the sum of all the values and the number of values.
A common error in this kind of computation is making the mapper directly emit the mean; let's see what it can happen if we suppose to have a dataset like this:
01012000,0,10.0 02012000,0,20.0 03012000,0,2.0 04012000,0,4.0 05012000,0,3.0and two mappers, which will receive the first two and the last three lines respectively. The first mapper will compute a mean of 15.0, given from (10.0 + 20.0) / 2. The second will compute a mean of 3.0, given from (2.0 + 4.0 + 3.0) / 3. When the reducer receive this two values, it sums them together and divide by two, so that the mean will be: 9.0, given from (15.0 + 3.0) / 2. But the correct mean for the values in this example is 7.8, which is given from (10.0 + 20.0 + 4.0 + 2.0 + 3.0) / 5.
This error is due to the fact that any mapper can receive any number of lines, so the value it will emit is only a part of the information needed to compute a mean.
If instead of emitting the mean we emit the sum of the values and the number of values, we can overcome the problem. In the example we saw before, the first mapper will emit the pair (30.0, 2) and the second (9.0, 3); if we sum the values and divide it by the sum of the numbers, we obtain the right result.
Let's get back to our job and look at the reducer:
public static class MeanReducer extends ReducerThe reducer is simpler because it has just to retrieve all the SumCount objects emitted from the reducers and add them together. After receiving the input, it loops over the map of the SumCount objects and emits the month and the mean.{ private Map sumCountMap = new HashMap<>(); @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { SumCount totalSumCount = new SumCount(); // loops over all the SumCount objects received for this month (the "key" param) for (SumCount sumCount : values) { // sums all of them totalSumCount.addSumCount(sumCount); } // puts the resulting SumCount into a map sumCountMap.put(new Text(key), totalSumCount); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // loops over the months collected in the reduce() method for (Text month: sumCountMap.keySet()) { double sum = sumCountMap.get(month).getSum().get(); int count = sumCountMap.get(month).getCount().get(); // emits the month and the mean of the max temperatures for the month context.write(month, new DoubleWritable(sum/count)); } } }