Wednesday, May 21, 2014

Implementing Writable interface of Hadoop

As we saw in the previous posts, Hadoop makes an heavy use of network transmissions for executing its jobs. As Doug Cutting (the creator of Hadoop) explaines in this post on the Lucene mailing list, java.io.Serializable is too heavy for Hadoop's needs and so a new interface has been developed: Writable. Every object you need to emit from mapper to reducers or as an output has to implement this interface in order to make Hadoop trasmit the data from/to the nodes in the cluster.

Hadoop comes with several wrappers around primitive types and widely used classes in Java:
Java primitive Writable implementation
boolean BooleanWritable
byte ByteWritable
short ShortWritable
int IntWritable
VIntWritable
float FloatWritable
long LongWritable
VLongWritable
double DoubleWritable


Java class Writable implementation
String Text
byte[] BytesWritable
Object ObjectWritable
null NullWritable


Java collection Writable implementation
array ArrayWritable
ArrayPrimitiveWritable
TwoDArrayWritable
Map MapWritable
SortedMap SortedMapWritable
enum EnumWritable

For example, if we need a mapper to emit a String, we need to use a Text object wrapped around the string we want to emit.

The interface Writable defines two methods:
  • public void write(DataOutput dataOutput) throws IOException
  • public void readFields(DataInput dataInput) throws IOException
The first method, write() is used for writing the data onto the stream, while the second method, readFields(), is used for reading data from the stream. The wrappers we saw above just send and receive their binary representation over a stream.
Since Hadoop needs also to sort data while in the shuffle-and-sort phase, it needs also the Comparable interface to be implemented, so it defines the WritableComparable interface which is an interface that implements both Writable and Comparable.
If we need to emit a custom object which has no default wrapper, we need to create a class that implements the WritableComparable interface. In the mean example we saw on this post, we used the SumCount class, which is a class that implements WritableComparable (the source code is available on github):
public class SumCount implements WritableComparable<SumCount> {

    DoubleWritable sum;
    IntWritable count;

    public SumCount() {
        set(new DoubleWritable(0), new IntWritable(0));
    }

    public SumCount(Double sum, Integer count) {
        set(new DoubleWritable(sum), new IntWritable(count));
    }

    public void set(DoubleWritable sum, IntWritable count) {
        this.sum = sum;
        this.count = count;
    }

    public DoubleWritable getSum() {
        return sum;
    }

    public IntWritable getCount() {
        return count;
    }

    public void addSumCount(SumCount sumCount) {
        set(new DoubleWritable(this.sum.get() + sumCount.getSum().get()), new IntWritable(this.count.get() + sumCount.getCount().get()));
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {

        sum.write(dataOutput);
        count.write(dataOutput);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {

        sum.readFields(dataInput);
        count.readFields(dataInput);
    }

    @Override
    public int compareTo(SumCount sumCount) {

        // compares the first of the two values
        int comparison = sum.compareTo(sumCount.sum);

         // if they're not equal, return the value of compareTo between the "sum" value
        if (comparison != 0) {
            return comparison;
        }

        // else return the value of compareTo between the "count" value
        return count.compareTo(sumCount.count);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        SumCount sumCount = (SumCount) o;

        return count.equals(sumCount.count) && sum.equals(sumCount.sum);
    }

    @Override
    public int hashCode() {
        int result = sum.hashCode();
        result = 31 * result + count.hashCode();
        return result;
    }
}
As we can see, it's very easy to code the two methods defined by the Writable interface: they just call the write() and readFields() method of the primitive types of the properties of SumCount class; it's important setting the properties in the same order in both read() and writeFields(), otherwise it will not work. The other methods of this class are the getters, setters and the methods needed by the Comparable interface, which should be nothing new to a Java developer.