In the first part of this series on Apache Hadoop, I explained how MapReduce works at a conceptual level. In this installment, the second of three, I show how to write code that runs on Hadoop — starting with a MapReduce program in Java.
Development Environment
To get started, we need Java (Oracle JDK 6 is required), Git, Maven, and Hadoop itself. Download the latest stable release of Apache Hadoop (1.0.4) from the releases page, then extract it to a suitable place. On my laptop:
% tar zxf hadoop-1.0.4.tar.gz % export HADOOP_HOME=$(pwd)/hadoop-1.0.4 % $HADOOP_HOME/bin/hadoop version Hadoop 1.0.4
In another directory, checkout the Git repository that accompanies this article:
% git clone git://github.com/tomwhite/hadoop-drdobbs.git % cd hadoop-drdobbs % mvn install
The repository contains a small amount of sample data for testing:
% cat data/*.tsv dobbs 2007 20 18 15 dobbs 2008 22 20 12 doctor 2007 545525 366136 57313 doctor 2008 668666 446034 72694
The file contains a few lines from the Google Books Ngram Dataset, which I mentioned in thefirst part of the series. To recap, the first line says that the word "dobbs" in books from 2007 occurred 20 times overall, and these occurrences were found on 18 pages in 15 books.
Java MapReduce
Let's write the MapReduce job to find the total count for each word. We start with the map function, which is represented in Java by an instance of
org.apache.hadoop.mapreduce.Mapper.
The first thing we need to decide about our mapper is the types of the input key-value pairs and the output key-value pairs. The declaration of the Mapper class is:public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
Because we are processing text, we use
TextInputFormat
, which determines the input types for us as LongWritable
and Text
(both found in the org.apache.hadoop.io
package). TheseWritable
types are wrappers around standard Java types (long
and String
, in this case) optimized for efficiency of serialization. Authors of MapReduce programs can use the Writable
types without worrying about serialization. The only time that you might be exposed to serialization is when writing a custom Writable
type. In such cases, it is usually better to use a serialization library, such as Avro.
Going back to the input type,
TextInputFormat
presents the input to our mapper as(LongWritable, Text)
pairs, like this:(0, "dobbs 2007 20 18 15") (20, "dobbs 2008 22 20 12") (40, "doctor 2007 545525 366136 57313") (72, "doctor 2008 668666 446034 72694")
The key is the offset within the file, and the value is the content of the line. It is the job of the mapper to extract the word and the number of occurrences, and ignore everything else. Therefore, its output is
(word, count)
pairs, of type (Text, LongWritable)
. The signature of the mapper looks like this:public class ProjectionMapper extends Mapper<LongWritable, Text, Text, LongWritable>
All that remains is for us to write the implementation of the
map()
method. The source for the whole mapper class appears in Listing One (ProjectionMapper.java
).Listing One: ProjectionMapper.java.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| package com.tom_e_white.drdobbs.mapreduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ProjectionMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private Text word = new Text(); private LongWritable count = new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // value is tab separated values: word, year, occurrences, #books, #pages // we project out (word, occurrences) so we can sum over all years String[] split = value.toString().split( "\t+" ); word.set(split[ 0 ]); if (split.length > 2 ) { try { count.set(Long.parseLong(split[ 2 ])); context.write(word, count); } catch (NumberFormatException e) { // cannot parse - ignore } } } } |
There are a few things to note about this code. First, there are two instance variables,
word
andcount
, which are used to store the map output key and value. The map()
method is called once per input record, so it pays to avoid unnecessary object creation. The body of map()
is straightforward: It splits the tab-separated input line into fields, and uses the first field as theword
, and the third as the count
. The map output is written using the write method in Context
. For simplicity, this code ignores lines with an occurrence field that is not a number, but there are other actions you could take, such as incrementing a MapReduce counter to track how many lines it affects (see the getCounter()
method on Context
for details).
Running through our tiny dataset, the map output looks like this:
("dobbs", 20) ("dobbs", 22) ("doctor", 545525) ("doctor", 668666)
As I discussed in the first part of the series, Hadoop transforms the map output so that the values are brought together for a given key, in a process called the shuffle. In our abstract representation, the input to the reduce step looks like this:
("dobbs", [20, 22]) ("doctor", [545525, 668666])
All our reduce implementation has to do is sum the counts. We need an implementation of
org.apache.hadoop.mapreduce.Reducer
with the following signature:
1
2
| public class LongSumReducer extends Reducer< Text, LongWritable, Text, LongWritable> |
We could write the class ourselves, but we don't need to because Hadoop comes with an implementation, shown in Listing Two (
LongSumReducer.java
).Listing Two LongSumReducer.java (code from Apache Hadoop project).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| package org.apache.hadoop.mapreduce.lib.reduce; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Reducer; public class LongSumReducer<KEY> extends Reducer<KEY, LongWritable, KEY,LongWritable> { private LongWritable result = new LongWritable(); public void reduce(KEY key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0 ; for (LongWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } |
Notice that the
reduce()
method signature is different from map()
because it has an iterator over the values, rather than a single value. This reflects the grouping that the framework performs on the values for a key. In LongSumReducer
, the implementation is very simple: It sums the values, then writes the total out using the same key as the input.
The output of the reducer will be:
("dobbs", 42) ("doctor", 1214191)
No comments:
Post a Comment