Wednesday, May 21, 2014

Implementing Writable interface of Hadoop

As we saw in the previous posts, Hadoop makes an heavy use of network transmissions for executing its jobs. As Doug Cutting (the creator of Hadoop) explaines in this post on the Lucene mailing list, java.io.Serializable is too heavy for Hadoop's needs and so a new interface has been developed: Writable. Every object you need to emit from mapper to reducers or as an output has to implement this interface in order to make Hadoop trasmit the data from/to the nodes in the cluster.

Hadoop comes with several wrappers around primitive types and widely used classes in Java:
Java primitive Writable implementation
boolean BooleanWritable
byte ByteWritable
short ShortWritable
int IntWritable
VIntWritable
float FloatWritable
long LongWritable
VLongWritable
double DoubleWritable


Java class Writable implementation
String Text
byte[] BytesWritable
Object ObjectWritable
null NullWritable


Java collection Writable implementation
array ArrayWritable
ArrayPrimitiveWritable
TwoDArrayWritable
Map MapWritable
SortedMap SortedMapWritable
enum EnumWritable

For example, if we need a mapper to emit a String, we need to use a Text object wrapped around the string we want to emit.

The interface Writable defines two methods:
  • public void write(DataOutput dataOutput) throws IOException
  • public void readFields(DataInput dataInput) throws IOException
The first method, write() is used for writing the data onto the stream, while the second method, readFields(), is used for reading data from the stream. The wrappers we saw above just send and receive their binary representation over a stream.
Since Hadoop needs also to sort data while in the shuffle-and-sort phase, it needs also the Comparable interface to be implemented, so it defines the WritableComparable interface which is an interface that implements both Writable and Comparable.
If we need to emit a custom object which has no default wrapper, we need to create a class that implements the WritableComparable interface. In the mean example we saw on this post, we used the SumCount class, which is a class that implements WritableComparable (the source code is available on github):
public class SumCount implements WritableComparable<SumCount> {

    DoubleWritable sum;
    IntWritable count;

    public SumCount() {
        set(new DoubleWritable(0), new IntWritable(0));
    }

    public SumCount(Double sum, Integer count) {
        set(new DoubleWritable(sum), new IntWritable(count));
    }

    public void set(DoubleWritable sum, IntWritable count) {
        this.sum = sum;
        this.count = count;
    }

    public DoubleWritable getSum() {
        return sum;
    }

    public IntWritable getCount() {
        return count;
    }

    public void addSumCount(SumCount sumCount) {
        set(new DoubleWritable(this.sum.get() + sumCount.getSum().get()), new IntWritable(this.count.get() + sumCount.getCount().get()));
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {

        sum.write(dataOutput);
        count.write(dataOutput);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {

        sum.readFields(dataInput);
        count.readFields(dataInput);
    }

    @Override
    public int compareTo(SumCount sumCount) {

        // compares the first of the two values
        int comparison = sum.compareTo(sumCount.sum);

         // if they're not equal, return the value of compareTo between the "sum" value
        if (comparison != 0) {
            return comparison;
        }

        // else return the value of compareTo between the "count" value
        return count.compareTo(sumCount.count);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        SumCount sumCount = (SumCount) o;

        return count.equals(sumCount.count) && sum.equals(sumCount.sum);
    }

    @Override
    public int hashCode() {
        int result = sum.hashCode();
        result = 31 * result + count.hashCode();
        return result;
    }
}
As we can see, it's very easy to code the two methods defined by the Writable interface: they just call the write() and readFields() method of the primitive types of the properties of SumCount class; it's important setting the properties in the same order in both read() and writeFields(), otherwise it will not work. The other methods of this class are the getters, setters and the methods needed by the Comparable interface, which should be nothing new to a Java developer.

Thursday, May 15, 2014

More about Hadoop combiners

Hadoop combiners are a very powerful tool to speed up our computations. We already saw what a combiner is in a previous post and we also have seen another form of optimization in this post. Let's put all together to get the broader idea.
The combiners are optimizations that can be used with Hadoop to make a local-reduction: the idea is to reduce the key-value pairs directly on the mapper, to avoid transmitting all of them to the reducers.
Let's get back to the Top20 example from the previous post, which finds the top 20 words most used in a text. The Hadoop output of this job is shown below:
...
Map input records=4239
Map output records=37817
Map output bytes=359621
Input split bytes=118
Combine input records=0
Combine output records=0
Reduce input groups=4987
Reduce shuffle bytes=435261
Reduce input records=37817
Reduce output records=20
...
As we can see in the lines highlighted in bold, without a combiner we have 4239 lines in input for the mappers and 37817 key-value pairs emitted (the number of different words of the text). Having defined no combiner, the input and output records of combiners are 0, and so the input records for the reducers are exactly those emitted by the mappers, 37817.

Let's define a simple combiner:
    public static class WordCountCombiner extends Reducer {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            // computes the number of occurrences of a single word
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
As we can see, the code has the same logic of the reducer, since its target is the same: reducing key/value pairs.
Running the job having set the combiner gives us this result:
...
Map input records=4239
Map output records=37817
Map output bytes=359621
Input split bytes=116
Combine input records=37817
Combine output records=20
Reduce input groups=20
Reduce shuffle bytes=194
Reduce input records=20
Reduce output records=20
...
Looking at the output from Hadoop, we see that now the combiner has 37817 input records: this means that the records emitted from the mappers were all sent to the combiners; the result of the combiners is of 20 records emitted, which is the number of records received by the reducers.
Wow, that's a great result! We avoided the transmission of a lot of data: just 20 records instead of 37817 that we had without the combiner.

But there's a big disadvantage using combiners: since is an optimization, Hadoop does not guarantee their execution. So, what can we do to ensure a reduction at the mapper-level? Simple: we can put the logic of the reducer inside the mapper!

This is exactly what we've done in the mapper of this post. This pattern is called "in-mapper combiner". The reduce part is started at mapper level, so that the key-value pairs sent to the reducers are minimized.
Let's see Hadoop output with this pattern (in-mapper combiner and without the stand-alone combiner):
...
Map input records=4239
Map output records=4987
Map output bytes=61522
Input split bytes=118
Combine input records=0
Combine output records=0
Reduce input groups=4987
Reduce shuffle bytes=71502
Reduce input records=4987
Reduce output records=20...
Compared to the execution of the other mapper (without combining), this mapper outputs only 4987 records instead of the 37817 that are emitted to the reducers. A big reduction, even if not as big as the one obtained with the stand-alone combiner.
And what happens if we decide to couple the in-mapper combiner pattern and the stand-alone combiner? Well, we've got the best of the two:
...
Map input records=4239
Map output records=4987
Map output bytes=61522
Input split bytes=116
Combine input records=4987
Combine output records=20
Reduce input groups=20
Reduce shuffle bytes=194
Reduce input records=20
Reduce output records=20
...
In this last case, we have the best performance because we're emitting from the mapper a reduced number of records, the combiners (if it's executed) reduce even more the size of the data to be emitted. The only downside of this approach I can think of is that it takes a lot of time to be coded.