Task运行过程分析.docx
《Task运行过程分析.docx》由会员分享,可在线阅读,更多相关《Task运行过程分析.docx(5页珍藏版)》请在冰点文库上搜索。
MapTask:
read,map,collect,spill,combine
ReduceTask:
shuffle,merge,sort,reduce,write
-----------------------------------------------MapTask内部实现--------------------------------------------------------------------
每个map处理一个输入数据的InputSplit,并将产生的若干数据片段写到本地磁盘上,ReduceTask则从每个maptask上远程拷贝相应的数据片段,经
分组聚集和规约后,将结果写到HDFS上作为最终结果。
MapTask将中间计算结果存放在本地磁盘上,而ReduceTask通过HTTP请求各个MapTask端pull相应的数据,为了支持大量的ReduceTask并发从Map
Task端拷贝数据,Hadoop采用了JettyServer作为httpserver处理并发数据请求。
MapTask执行过程:
首先通过用户提供的InputFormat将对应的InputSplit解析成一系列key/value,并以此交给用户编写的map函数处理,接着按照
制定的partitioner对数据分片,以确定每个key/value输入到那个ReduceTask处理,之后将数据交给用户定义的combiner进行以此本地规约,最后
将结果存储在本地磁盘。
ReduceTask执行过程:
首先通过http请求从各个已经完成的MapTask上拷贝Reduce对于的数据片段,所有数据拷贝完之后,再以key为关键字对所有
数据进行排序,通过排序,key相同的记录聚集在一起形成若干分组,然后将每组数据交给用户编写的reduce函数处理,并将最终的数据结果写到HDFS
上作为最终输出结果。
Hadoop内部实现了基于行压缩的数据存储格式IField,避免不必要的磁盘和网络开销。
排序:
MapTask和ReduceTask均会对数据按照key进行排序。
对于MapTask,将处理的结果暂时放在一个缓存中,当缓冲区使用率达到一定的阀值后,再对缓冲区的数据进行以此排序,并将这些有序数据以IFile的
形式存储在磁盘上,当所有的数据存储完之后,它会对磁盘上所有的文件进行以此合并,以将这些文件合并成一个大的有序的文件。
对于ReduceTask,从每个MapTask上远程拷贝相应的数据,如果文件大于一个阀值,则放在磁盘上,否则放在内存中。
如果磁盘上的文件达到一定的
阀值,则进行以此合并并生成一个较大的文件,如果内存中文件大小或者数目超过一定的阀值,则进行以此合并,并将文件写到磁盘上,当所有的数据
拷贝完之后,ReduceTask对内存和磁盘上的数据进行以此合并。
快速排序
文件归并由类Merger完成,它要求待排序对象需要segment实例化对象,segment是对磁盘和内存中的IFile格式文件的抽象,具有迭代器的作用。
可迭代
读取IFile文件中的key/value记录。
Reporter:
所有的Task需要周期性向TaskTracker汇报最新进度和计数器值,是由Reporter组件实现的。
在map/reduceTask中,TaskReporter实现了Reporter接口,并且以线程的形式启动,TaskReporter汇报的信息:
任务执行进度,任务计数器的值。
任务执行进度:
任务执行进度信息被封装在Progress中,且每个Progress实例以树的形式存在。
对于MapTask而言,它作为一个大阶段不可再分,则该阶段进度值可表示
成已读取数据量占总数据量的比例。
对于ReduceTask而言,分成三个阶段:
shuffle,sort,reduce,每个阶段占任务的1/3,考虑到在shuffle阶段,reduceTask需从M个MapTask上读取一片
数据,因此被分成M个阶段,每个阶段占shuffle进度的1/M。
TaskReporter发现下面两种状态之一才会汇报:
任务执行进度发生变化;任务的某个计数器值发生变化。
在某个时间间隔内,如果任务执行进度和计数器均未发生变化,Task只会简单的通过RPC函数ping探测到的TaskTracker是否活着。
如果在一定的时间内,某个
任务执行进度和计数器均未发生变化,则TaskTracker认为其处于悬挂状态,直接将其杀掉,但是为了防止某条记录因处理时间过长而导致被杀,采用两种
方法:
每隔一段时间调用一次TaskProgress.progress()函数,以告诉TaskTracker自己仍然活着;增大任务超时参数;
MapTask分为四种:
Job-setupTask,job-cleanupTask,Task-cleanupTask,MapTak
Job-setupTask,job-cleanupTask是作业运行时启动的第一个和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作。
比如创建和删除作业临时
文件,Task-cleanupTask则是任务被杀或失败后用于清理已写入临时目录中数据的任务。
MapTask计算流程分为五个阶段:
read,map,collect,spill,combine
read阶段:
MapTask通过用户编写的RecordReader,从输入的InputSplit中解析出一个个key/value
Map阶段:
该阶段主要是将解析出的key/value交给用户编写的map()函数处理,并产生一系列新的key/value
collect阶段:
用户编写的map函数中,当数据处理完之后,一般会调用OutputCollector.collect()输出结果,在该函数内部它会将生成的key/value分片写入
一个环形的内存缓冲区内。
spill阶段:
即“溢出”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件,需要注意的是,讲数据写到磁盘之前,需要将数据
进行排序,并在必要时对数据进行排序合并等操作。
combine阶段;当所有的数据处理完之后,MapTask对所有的临时文件进行以此合并,以确保最终只会生产一个数据文件。
在这个数据文件中,数据也是逻辑分块
的,相邻位置的数据一般是对应的是同一个reduce。
MapTask中最重要的是输出结果在内存和磁盘中的组织方式,涉及到collect,spill,combine三个阶段。
collect过程:
map函数处理完一堆key/value,产生新的key/value,会调用OutputCollector.collect()函数输出结果。
MapOutputBuffer采用的是环形内存缓冲区保存数据,缓冲区达到一定的阀值后,由线程SpillThread将数据写到一个临时文件,当数据处理完之后,对所有
的临时文件进行合并生成一个最终的文件。
环形缓冲区是的MapTask的Collect阶段和Spill阶段可并行运行。
MapOutputBuffer内部采用两级索引结构,设计三个环形内存缓冲区,分别是kvoffsets,kvindices和kvbuffer,这三个缓冲区的大小(io.sort.mb)默认为
100M.
kvoffsets:
偏移量索引数组,用来保存key/value信息在位置索引kvindices中的偏移量。
一个key/value对应一个int大小,数组kvindices有三个int大小,
数组kvindices包括(所在的partition号,key开始位置,value开始位置),Hadoop安装1:
3的大小分配。
kvindices:
位置索引位置,保存key/value值在数据缓冲区kvbuffer中的起始位置。
kvbuffer:
kvbuffer即数据缓冲区,用于保存实际的key/value值,默认情况下最多可使用io.sort.mb的95%,当缓冲区使用率超过io.sort.spill.percent后,
就触发SpillThread进程将数据写入磁盘。
OutputCollector.collect()方法和OutputCollector.write()方法是生产者,SpillThread是消费者,它们之间的同步是通过可重入的互斥锁spillLock和
spillLock上的两个条件变量完成。
-------------------------------------------------三个环形缓冲区的工作步骤(省略……)-------------------------------------------------------
spill过程分析:
由SpillThread线程完成,该线程实际上是kvbuffer的消费者。
该线程调用sortAndSpill()将缓冲区间内的数据写到磁盘上
步骤1:
利用快速排序对kvbuffer中的数据进行排序,排序方法,先按照分区号partition进行排序,然后按照key值进行排序。
步骤2:
按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件中,如果用户配置了combiner,则写入文件之前,对每个分区的数据
进行以此聚集操作。
步骤3:
将分区数据的元信息写到内存索引数据结构SpillRecord中,其中,每个分区的元信息包括临时文件的偏移量,压缩前数据大小和压缩后数据大小。
如果
当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
combine过程分析:
当索所有的数据处理完之后,MapTask会将临时文件合并成一个大文件,并保存在Output/file.out中,同时生成相应的索引文件Output/file.out.index
在进行文件合并过程中,MapTask以分区为单位进行合并,对于某个分区,它将采取多轮递归合并的方式,并将生成的文件重新添加到待合并列表中,对文件
排序后重复上面过程,直到最终生成一个大文件。
让每一个MapTask最终生成一个大文件,可避免同时打开大量的文件和同时读取大量小文件产生的随机读取带来的开销。
-----------------------------------------------ReduceTask内部实现--------------------------------------------------------------------
ReduceTask也分为四类:
Job-setupTask,job-cleanupTask,Task-cleanupTask,ReduceTask
ReduceTask需要从各个MapTask上读取数据,经排序后以组为单位交给用户编写的Reduce函数,并将结果写到HDFS上。
ReduceTask分为五个阶段:
shuffle阶段:
也称为copy阶段,ReduceTask从各个MapTask上拷贝一片数据,并针对这一片数据,如果其大小超过一定的阀值,就写到磁盘上,否则直接放到内存中。
Merge阶段:
在远程拷贝数据的同时,ReduceTask启动两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
sort阶段:
按照MapReduce语义,用户编写的reduce()函数输入数据是按key进行聚集的一组数据,为了将key相同的数据聚集在一起,Hadoop采用了基于排序的策略。
由于每一个MapTask已经实现对自己的处理结果进行局部排序,因此ReduceTask只需要对所有的数据进行以此归并排序即可。
Reduce阶段:
在该阶段中,ReduceTask将每组数据依次交给用户编写的Reduce函数处理。
Write阶段:
Reduce函数将结果写到HDFS中。
shuffle,merge阶段解析:
在ReduceTask中shuffle和merge是并行执行的,当远程数据量超过一定的阀值之后就会触发相应的合并进程对数据进行合并,这两个阶段都是由ReduceCorpier实现的
shuffle,merge阶段分为三个子阶段:
1.准备运行完成的MapTask列表。
GetMapEventsThread线程周期性通过RPC从TaskTracker获取已完成的MapTask列表,并保存到映射表mapLocations(保存了TaskTracker
host与已完成任务列表的映射关系)中。
为防止出现网络热点,ReduceTask通过对所有TaskTrackerHost进行“混洗”操作以打乱数据拷贝顺序,并将调整后的MapTask
输出数据位置保存到ScheduleCopies列表。
TaskTracker启动MapEventsFetcherThread线程,该线程周期性(心跳)通过RPC从JobTracker上获取已经完成的MapTask列表,并保存到TaskComplerionEvent类型列表
allMapEvents中。
对于ReduceTask而言,它会启动GetMapEventsThread线程,该线程周期性的通过RPC从Tasktrcker上获取已完成的MapTask列表,将成功完成的Map
Task放到表mapLocations中,为了避免出现访问数据热点(大量进程集中读取某个TaskTracker上的数据),ReduceTask不会直接将列表mapLocations中的MapTask
输出数据的位置交给mapOutputCorpier线程,而是实现进行预处理,将所有的TaskTrackerhost进行混洗操作,然后保存到ScheduleCopies列表中,mapOutputCorpier
线程从该列表中获取拷贝的MapTask输出数据的位置。
2.远程拷贝数据:
ReduceTask同时启动多个MapOutputCopier线程,这些线程从ScheduledCopies列表中获取MapTask输出位置,并通过HttpGet远程拷贝数据,对于获
取的数据分片,如果大小超过一定的阀值,则存放到磁盘上,否则放到内存。
不管保存在内存中还是磁盘上,MapOutputCopier都会保存一个MapOutput对象来描述元数据
信息,如果数据保存到内存中,就将该对象保存到列表mapOutPutsFilesInMemory中,否则就保存到mapOutPutsFilesInDisks中。
MapOutputCopier线程的个数由
mapred.reduce.parallel.copies(默认为5)确定。
在ReduceTask中,大部分内存用于缓存从MapTask端拷贝的数据分片,这些内存占到JVMMaxHeadSize的
mapred.job.shuffle.input.buffer.percent(默认是0.7)倍,
3.合并内存文件和磁盘文件:
为了防止内存或者磁盘上的文件数据过多,ReduceTask启动LocalFSMerger和InMemFSMergeThread两个线程分别对内存和磁盘上的文件进行
合并。
即对存放在不同位置的文件分别进行合并。