Reducer组件
Map过程输出的键值对,将由Reducer组件进行合并处理。Hadoop提供了一个抽象类Reducer,该类的定义代码如下所示:
1 public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
2 protected void setup(Context context
3 ) throws IOException, InterruptedException {
4 // NOTHING
5 }
6 @SuppressWarnings("unchecked")
7 protected void reduce(KEYIN key, Iterable<VALUEIN> values,
8 Context context) throws IOException, InterruptedException {
9 for(VALUEIN value: values) {
10 context.write((KEYOUT) key, (VALUEOUT) value);
11 }
12 }
13 protected void cleanup(Context context
14 ) throws IOException, InterruptedException {
15 // NOTHING
16 }
17 public void run(Context context) throws IOException, InterruptedException
18 {
19 setup(context);
20 try {
21 while (context.nextKey()) {
22 reduce(context.getCurrentKey(), context.getValues(), context);
23 // If a back up store is used, reset it
24 Iterator<VALUEIN> iter = context.getValues().iterator();
25 if(iter instanceof ReduceContext.ValueIterator) {
26 ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
27 }
28 }
29 } finally {
30 cleanup(context);
31 }
32 }
33 }
上述代码中,当用户的应用程序调用Reducer类时,会直接调用Reducer类里面的run()方法,该方法中定义了setup()、reduce()、cleanup()三个方法的执行顺序:setup→reduce→cleanup。
默认情况下,setup()和cleanup()方法内部不做任何处理,也就是说,reduce()方法是处理数据的核心方法,该方法接收Map阶段输出的键值对数据,对传入的键值对数据进行处理,并产生最终的某种形式的结果输出。
值得一提的是,如果reduce()方法不符合应用要求时,可以尝试在setup()和cleanup()方法中添加代码满足应用要求,setup()方法一般会在reduce()方法之前执行,我们可以在setup()方法中做一些初始化工作,如任务的一些配置信息。cleanup()方法一般会在reduce()方法之后执行,我们可以在cleanup()方法中做一些结尾清理的工作,如资源释放等。
如果我们想自定义reduce()方法,我们只需要继承Reducer类并重写reduce()方法即可。接下来,我们以词频统计为例,自定义一个reduce()方法,具体代码如文件所示。
文件 WordCountReducer.java
1 import java.io.IOException;
2 import org.apache.hadoop.io.IntWritable;
3 import org.apache.hadoop.io.Text;
4 import org.apache.hadoop.mapreduce.Reducer;
5 public class WordCountReducer extends Reducer<Text, IntWritable,
6 Text, IntWritable> {
7 @Override
8 protected void reduce(Text key, Iterable<IntWritable> value,
9 Reducer<Text, IntWritable, Text, IntWritable>.Context
10 context) throws IOException, InterruptedException {
11 //定义一个计数器
12 int count = 0;
13 //遍历一组迭代器,把每一个数量1累加起来就构成了单词的总次数
14 for (IntWritable iw : value) {
15 count += iw.get();
16 }
17 context.write(key, new IntWritable(count));
18 }
19 }