学科分类
目录
Hadoop技术栈

Reducer组件

Map过程输出的键值对,将由Reducer组件进行合并处理。Hadoop提供了一个抽象类Reducer,该类的定义代码如下所示:

 1  public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

 2  protected void setup(Context context

 3    ) throws IOException, InterruptedException {

 4    // NOTHING

 5  }

 6  @SuppressWarnings("unchecked")

 7  protected void reduce(KEYIN key, Iterable<VALUEIN> values, 

 8      Context context) throws IOException, InterruptedException {

 9    for(VALUEIN value: values) {

 10     context.write((KEYOUT) key, (VALUEOUT) value);

 11   }

 12  }

 13 protected void cleanup(Context context

 14        ) throws IOException, InterruptedException {

 15   // NOTHING

 16  }

 17  public void run(Context context) throws IOException, InterruptedException 

 18 {

 19   setup(context);

 20   try {

 21   while (context.nextKey()) {

 22     reduce(context.getCurrentKey(), context.getValues(), context);

 23      // If a back up store is used, reset it

 24      Iterator<VALUEIN> iter = context.getValues().iterator();

 25      if(iter instanceof ReduceContext.ValueIterator) {

 26       ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();    

 27      }

 28     }

 29   } finally {

 30    cleanup(context);

 31   }

 32  }

 33 }

上述代码中,当用户的应用程序调用Reducer类时,会直接调用Reducer类里面的run()方法,该方法中定义了setup()、reduce()、cleanup()三个方法的执行顺序:setup→reduce→cleanup。

默认情况下,setup()和cleanup()方法内部不做任何处理,也就是说,reduce()方法是处理数据的核心方法,该方法接收Map阶段输出的键值对数据,对传入的键值对数据进行处理,并产生最终的某种形式的结果输出。

值得一提的是,如果reduce()方法不符合应用要求时,可以尝试在setup()和cleanup()方法中添加代码满足应用要求,setup()方法一般会在reduce()方法之前执行,我们可以在setup()方法中做一些初始化工作,如任务的一些配置信息。cleanup()方法一般会在reduce()方法之后执行,我们可以在cleanup()方法中做一些结尾清理的工作,如资源释放等。

如果我们想自定义reduce()方法,我们只需要继承Reducer类并重写reduce()方法即可。接下来,我们以词频统计为例,自定义一个reduce()方法,具体代码如文件所示。

文件 WordCountReducer.java

1  import java.io.IOException;

 2  import org.apache.hadoop.io.IntWritable;

 3  import org.apache.hadoop.io.Text;

 4  import org.apache.hadoop.mapreduce.Reducer;

 5  public class WordCountReducer extends Reducer<Text, IntWritable,

 6                             Text, IntWritable> {

 7    @Override

 8    protected void reduce(Text key, Iterable<IntWritable> value,

 9        Reducer<Text, IntWritable, Text, IntWritable>.Context 

 10            context) throws IOException, InterruptedException {

 11     //定义一个计数器

 12     int count = 0;

 13     //遍历一组迭代器,把每一个数量1累加起来就构成了单词的总次数

 14     for (IntWritable iw : value) {

 15       count += iw.get();

 16     }

 17     context.write(key, new IntWritable(count));

 18   }

 19 }
点击此处
隐藏目录