之前介绍了 MapReduce 的原理、 MapReduce 任务的提交与切片以及Map 任务的执行者MapTask相关的源码,知道了 MapReduce 的原理并了解了部分 Map 任务源码的实现,这里分析一下具体执行 Reduce 任务的 ReduceTask 的源码,看一下 Reduce 任务的具体源码实现。
这是 MapReduce 源码分析系列的第三篇文章,有关 MapReduce 相关的所有内容可以到这里查看。
之前看到了 MapTask 任务的具体执行流程,Map 任务执行的输出的一个或多个结果最终会交给 Reduce 端,由 Reduce 汇聚处理之后输出。现在就看一下 ReduceTask 的具体源码,看一下 Reduce 端都做了些什么事。这里有关 Shuffle 的代码先不分析,主要看 Reduce 端的实现。
自定义 Reduce 处理逻辑 - Reducer
在编写 Reduce 端处理逻辑时一般需要继承 org.apache.hadoop.mapreduce.Reducer
类,然后重写 reduce
方法,就可以用自定义的 reduce
方法来处理数据了。
1 | public static class LogReducer extends Reducer<TextLongWritable, LogGenericWritable, Text, Text> { |
Reducer类中有一个 run
方法,reduce函数就是在那里被调用的:
1 | /** |
这里有关 context
上下文的介绍可以参考 MapTask源码 那篇文章,这里需要注意的一点是 reduce 端最终汇聚到一个 Reduce 数据量可能会远远超出系统内存,Reduce 是怎么处理这种情况的?
下面开始看一下 ReduceTask 这个类。
Reduce 任务的实际执行者 - ReduceTask
跟 MapTask 一样,ReduceTask 会从run方法开始执行:
1 |
|
Reduce 端的输入迭代器
这里面有一个 RawKeyValueIterator
类型的迭代器,用于迭代 Reduce 端的输入数据:
1 | public void run(JobConf job, final TaskUmbilicalProtocol umbilical) |
这里的 shuffleConsumerPlugin.run()
运行 Shuffle 从多个 Map 中拉取数据到 Reduce 端,供 Reduce 消费。
Reduce 端的 RawComparator
分组比较器
MapTask 中的 run
方法下还有一个设置分组比较器的代码:
1 | public void run(JobConf job, final TaskUmbilicalProtocol umbilical) |
设置分组比较器的作用是为 reduce 进行分组, getOutputValueGroupingComparator
的实现如下:
1 | /** |
先通过 GROUP_COMPARATOR_CLASS
看一下用户有没有配置分组比较器,如果有就使用用户自定义分组比较器,如果没有就调用 getOutputKeyComparator()
返回一个默认的分组比较器。
看一下默认的分组比较器是什么,进入 getOutputKeyComparator
,代码如下:
1 | /** |
getOutputKeyComparator
中如果用户设置了 KEY_COMPARATOR
,就用用户自定义的 Key 比较器,如果用户没有定义Key比较器,就使用 map 端的key比较器进行分组。
runNewReducer
进入reduce任务
1 | "unchecked") ( |
在 runNewReducer
中,先获取上下文、反射获取 Reducer 对象,最后创建 reducerContext
上下文,这是关键,下面看一下创建 reducerContext
的 createReduceContext()
是怎么实现的:
1 | "unchecked") ( |
这里创建了 ReduceContextImpl 类的对象 reduceContext
,然后由 reduceContext
创建出 reducerContext
返回。ReduceContextImpl 具体实现如下:
1 | public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, |
这里就是根据参数调用构造函数创建出一个 ReduceContextImpl 对象,需要特别注意的是这里的 input,其实就是 Reducer 中调用的 context.nextKey()
时的输入迭代器。
Reduce中的 nextKey getCurrentKey getValues
ReduceContextImpl 中还有 nextKey
、 getCurrentKey
、getValues
方法的具体实现。
nextKey
以nextKey为例:
1 | /** Start processing next unique key. */ |
先判断是否还有输入数据,如果有数据而且下一个数据的key跟当前的key相同,就直接调用 nextKeyValue
寻找下一个数据,如果有数据但是跟当前的key不相同,就将记录key数量的计数器加1,然后再寻找下一个数据。
看一下 nextKeyValue
的具体实现:
1 | /** |
其实 nextKeyValue
的主要功能就是寻找下一个数据
nextKeyValue 做的事情有:
如果hasMore为假表示没有数据了,就清空 key 和 value 的值,直接返回false;如果hasMore为真就继续往下走。
读取数据,先进性反序列化,对key和value赋值。
判断是否还有数据,如果有就比较当前 key 和下一条数据中的 key 是否相等,如果相等,nextKeyIsSame为true,如果不相等,nextKeyIsSame为false。
最后没调用一个nextKeyValue,
inputValueCounter
计数器就加1.
getCurrentKey
getCurrentKey 直接返回key就行了
1 | public KEYIN getCurrentKey() { |
getValues
1 | private ValueIterable iterable = new ValueIterable(); |
这里直接返回了一个 iterable
内部类 ValueIterable.
1 | protected class ValueIterable implements Iterable<VALUEIN> { |
而 ValueIterable 中有创建了一个 ValueIterator 的内部类, ValueIterator实现如下:
1 | protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> { |
这里面比较重要的时 next
和 hasNext
方法, hasNext
方法通过 firstValue || nextKeyIsSame
知道接下来还有没有数据可以使用,这里使用 nextKeyIsSame
作为条件是因为Reduce处理数据迭代器是针对相同 key 的数据放入同一个迭代器中,所以在实现时就采用 nextKeyIsSame
来判断下一个是否还有相同 key 值的数据可供读取。
ValueIterator 中的 next
方法最终调用的还是 ReduceContextImpl 中的 nextKeyValue
方法,因为最终读取数据还是要使用 ReduceContextImpl 中的 input,只不过用 nextKeyIsSame
来判断要不要放到同一个迭代器中给用户使用而已。
总结
Reduce端通过迭代器一次读取两条数据,第一条用于读取数据,第二条用于判断第二条数据是否交由同一个Reduce进行处理。所以即使数据量在Reduce端再大,也不会导致内存不足的情况发生,因为我们一次只需要能够存放两条数据的剩余内存即可,通过反复迭代,达到处理所有数据的效果。