之前介绍了 MapReduce 的原理和 MapReduce 任务的提交与切片相关的源码,知道了MapReduce的原理,这里分析一下具体执行Map任务的MapTask的源码,看一下Map任务的具体源码实现。
这是 MapReduce 源码分析系列的第二篇文章,有关 MapReduce 相关的所有内容可以到这里查看。
在MapReduce 任务的提交与切片这篇文章中,从源码角度看了一遍 MapReduce 任务从创建到提交,然后分片的整个流程,了解了分片机制的具体实现。有关资源调度的内容非常复杂,这里先不展开,回头有时间再好好研究一下,下面介绍一下节点 Container 中执行 Map 任务的 MapTask 的源码实现。
MapTask 介绍
MapTask 是具体执行 MapReduce 作业的地方,它会将输入的数据输出成Key-Value的形式存放到集群或者本地磁盘,然后由 ReduceTask 读取、聚合、输出到目标文件。
在 MapTask 中 map
函数可以由用户重写,进而实现自己想要的业务功能;另外在 Map 阶段还会进行数据混洗排序(shuffle);由于 Map 阶段输出结果是要保存到磁盘的,而磁盘进行IO读写是非常耗费资源的事情,大数据下进行频繁磁盘读写更加会降低效率,所以在 Map 阶段框架内会做一些优化问题,比如使用环形缓冲区降低数据溢写到磁盘的频率,合并时的合并顺序的优化等等。
Mapper 类的使用
先看一下怎么使用框架提供的 Mapper 类的,当用户需要自定义(大部分情况下都是需要的) Map 阶段数据处理所作的事情时,就需要创建一个自定义类继承自 org.apache.hadoop.mapreduce.Mapper
,并自定义一个 map
函数,如下所示,完整代码点击这里:
1 | public static class LogMapper extends Mapper<LongWritable, Text, NullWritable, Text> { |
设置好自定义 Map 类后,在创建 Job 时将 Map 类配置进去即可,类似于这样:
1 | public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { |
Mapper类做了什么
Mapper 中就是在一个循环中调用 map
方法:
1 | /** |
这里有一个 setup
和 cleanup
方法,这两个方法都是只在创建Mapper对象的时候执行一次,适合做一些初始化和清理资源的工作。
这里的run方法是由框架调用的,在调用到map函数时,由框架利用反射机制进行Mapper的实例化然后调用的run。
我们在之前的YARN原理中说给,节点最终的执行任务都是由YARN在节点的container中先启动一个YarnChild,然后又YarnChild开启MapTask或ReduceTask的。这是Yarn的执行流程:
程序具体执行时会启动一个叫 YarnChild 的类,在里面调用run防止执行具体的Task任务,相关具体代码如下。
1 | public static void main(String[] args) throws Throwable { |
MapTask 是什么
MapTask 是具体执行map任务的代码实现。在 MapTask 中,run
函数的代码如下:
1 |
|
使用 getNumReduceTasks
获取 reduce 的数量,如果有reduce就增加排序,如果没有就不排序。Reduce 的数量可以自己设置,在job中通过setNumReduceTasks
可以手动设置reduce的数量。
接下来是一些框架调度的内容,先跳过,来到 runNewMapper
使用新的API启动Map,这是重点,解析来分析一下 runNewMapper
这个方法。
MapTask任务的具体执行者 - runNewMapper
runNewMapper
的代码如下:
1 | "unchecked") ( |
读取Map输入的类 - NewTrackingRecordReader
在 runNewMapper
方法中,有这样一段代码:
1 | // make the input format |
这段代码的作用是创建Map输入读取类,用于后续帮助Map从分片中读取数据,打开 NewTrackingRecordReader 继续跟踪进去:
1 | NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, |
NewTrackingRecordReader
的构造函数创建了一个 private final org.apache.hadoop.mapreduce.RecordReader<K,V>
的 RecordReader 类,默认情况下 Map 的 inputFormat 是 TextInputFormat 类型,我们跟踪进 TextInputFormat 中看一下它的 createRecordReader
方法:
1 |
|
可以看到它返回的是一个 LineRecordReader 类型的 RecordReader,这里 LineRecordReader 是一个线性读取文件的工具类,也就是说我们在使用这个类读取Map的输入数据时是一行一行线性读取的,LineRecordReader 里面包含一些我们常用的方法,比如 getCurrentKey
、getCurrentValue
、nextKeyValue
等方法。
分区及合并的实现 - NewOutputCollector
在 runNewMapper
方法中,有这样一段代码:
1 | // get an output object |
如果没有Reduce Task,直接输出到磁盘目录,如果没有,就创建一个 NewOutputCollector
对象,在里面进行分区和排序后输出到磁盘。
NewOutputCollector 中实现了具体的分区操作:
1 | "unchecked") ( |
Partitioner 类就是在这里被实例化的,如果分区数量大于1,就通过反射创建一个Partitioner 实例,如果分区数量小于等于1,就直接返回分区数量为-1,最终就是只创建一个分区。
map中的write的具体实现
我们知道自定义的map函数最终要调用 context.write
将结果写出,这里的 context 就是在 MapTask 中我们创建的 NewOutputCollector 对象中实现的:
先创建一个mapContext,再利用mapContext创建一个mapperContext。
1 | org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> |
接下来调用 mapper.run(mapperContext)
,最终执行的就是mapperContext中的write方法。
1 | try { |
我们看一下 NewOutputCollector 是怎么实现write方法的:
1 |
|
再深入,进入 collector 的实现类 MapOutputBuffer,这个类也在MapTask中,是个内部类,它的 collect
实现如下:
1 | if (bufferRemaining <= 0) { |
这里有一个 spillLock
锁, 这个锁跟 MapTask 中的另一个进行排序、合并、写入磁盘操作的内部类共享:
1 | protected class SpillThread extends Thread { |
这里的 sortAndSpill
方法就是进行排序和溢写的方法,具体实现接着往下看:
1 | private void sortAndSpill() throws IOException, ClassNotFoundException, |
初始化及运行
上面配置好Map的输入读取工具类以及分区和溢写方法后,在 MapTask
中,接下来就是初始化和运行的代码:
1 | try { |
我们知道 input 的类型是 NewTrackingRecordReader ,而 NewTrackingRecordReader 最终是创建了一个 LineRecordReader ,所以这里的 initialize
最终调用的是 LineRecordReader 中的 initialize
方法,代码如下:
1 | public void initialize(InputSplit genericSplit, |
这里初始化所作的工作就是:
从切片中获得文件位置、起始偏移量start、结束偏移量end。
通过文件位置信息获取到分布式文件的 FileSystem ,然后打开文件。
判断是否是压缩文件,如果不使用压缩,就直接从分配给自己的起始偏移量start的位置开始读取文件。如果使用了压缩就需要读取所有的文件内容后再解压缩,然后再进行处理。根据输入数据是否进行了压缩,设置不同的 SplitLineReader 进行接下来的数据读取操作。
判断是否为第一个切片,如果不是第一个切片就更新文件起始偏移量的值start。这里采用的是直接忽略掉块的第一行的做法,因为数据可能分布在不同的块中,进而导致数据的不完整性。
nextKeyValue 、 getCurrentKey 、 getCurrentValue
在最开始提到的Mapper中,map是夹在while循环中的,代码如下:
1 | while (context.nextKeyValue()) { |
这里的 nextKeyValue、getCurrentKey、getCurrentValue
三个方法通过我们分析,可以知道就是 LineRecordReader 中的三个方法。
nextKeyValue
LineRecordReader 中 nextKeyValue
的代码实现如下:
1 | public boolean nextKeyValue() throws IOException { |
所做的事情就是:
设置默认key类型为LongWritable,value默认类型为Text类型
将行偏移量赋值给key,也就是说Map的输入Key存放的就是行的偏移量。
从 UncompressedSplitLineReader 中读取文件数据到value中。
读完将偏移量信息加上读取的数据长度。
如果最终没有数据可读就退出。
getCurrentKey 、 getCurrentValue
这两个在上面的 nextKeyValue
中已经赋值过,直接返回就行,很简单,代码如下:
1 |
|
总结
有点乱,我是一遍翻看源码,一遍查资料,反复看了很多遍,总结下来就是MapTask是由每个节点的YarnChild启动的,在真正开始运行map之前,会先根据job配置确定好输入输出类型、是否压缩、输入文件的路径信息、分片信息,加载好相应的读取工具类,然后进行初始化操作,确认好文件偏移量信息。在最终执行读取操作的时候利用已经配置好的工具类,逐行将文件内的数据读取到内存供 map
函数使用。当然这中间还会加入许许多多错误判断机制和默认设置,但是Map的大致流程就是上面说的那样了。源码分析起来好累!