卓航论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 341|回复: 3
打印 上一主题 下一主题

分布式基础学习【二】 —— 分布式计算系统(Map/Reduce)

[复制链接]
[至尊红钻5级]发帖数量≥8000篇 [未点亮至尊黄钻]威望不足10点 [未点亮至尊蓝钻]在线时间不足10小时 [未点亮至尊绿钻]贡献度不足10点 [至尊紫钻4级]金币≥20000个 [未点亮至尊粉钻]精华贴数不足10贴 [未点亮至尊黑钻]活跃不足8个
 等级: 
 级别: 论坛元老
 UID:  7   [未点亮普号显示]钻石不足3个
 阁 分: 36567
 阁 望: 0
 阁 献: 0
 活 跃: 0
 发 贴: 12405 (0)
 阁 币: 24162  
性 别: I'm 火星人!
阅读权限: 90
在线时长: 0 小时
注册时间: 2016-10-16
最后登录: 2016-10-18
go
楼主
发表于 2016-10-17 16:24:16 |只看该作者 |倒序浏览
本帖发表于 2016-10-17 16:24:16...阅读 342 人...加油,亲爱的楼主:[db:作者]
二. 分布式计算(Map/Reduce)
分布式式计算,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce 框架所设计的分布式框架。在Hadoop中,分布式文件系统,很大程度上,是为各种分布式计 算需求所服务的。我们说分布式文件系统就是加了分布式的文件系统,类似的定义推广到分 布式计算上,我们可以将其视为增加了分布式支持的计算函数。从计算的角度上看, Map/Reduce框架接受各种格式的键值对文件作为输入,读取计算后,最终生成自定义格式的 输出文件。而从分布式的角度上看,分布式计算的输入文件往往规模巨大,且分布在多个机 器上,单机计算完全不可支撑且效率低下,因此Map/Reduce框架需要提供一套机制,将此计 算扩展到无限规模的机器集群上进行。依照这样的定义,我们对整个Map/Reduce的理解,也 可以分别沿着这两个流程去看。。。
在Map/Reduce框架中,每一次计算请求,被称为作业。在分布式计算Map/Reduce框架中, 为了完成这个作业,它进行两步走的战略,首先是将其拆分成若干个Map任务,分配到不同的 机器上去执行,每一个Map任务拿输入文件的一部分作为自己的输入,经过一些计算,生成某 种格式的中间文件,这种格式,与最终所需的文件格式完全一致,但是仅仅包含一部分数据 。因此,等到所有Map任务完成后,它会进入下一个步骤,用以合并这些中间文件获得最后的 输出文件。此时,系统会生成若干个Reduce任务,同样也是分配到不同的机器去执行,它的 目标,就是将若干个Map任务生成的中间文件为汇总到最后的输出文件中去。当然,这个汇总 不总会像1 + 1 = 2那么直接了当,这也就是Reduce任务的价值所在。经过如上步骤,最终, 作业完成,所需的目标文件生成。整个算法的关键,就在于增加了一个中间文件生成的流程 ,大大提高了灵活性,使其分布式扩展性得到了保证。。。
I. 术语对照
和分布式文件系统一样,Google、Hadoop和....我,各执一种方式表述统一概念,为了保 证其统一性,特有下表。。。
文中翻译Hadoop术语Google术语相关解释
作业JobJob用户的每一个计算请求,就称为一个作业。
作业服务器JobTrackerMaster用户提交作业的服务器,同时,它还负责各个作业任务的分 配,管理所有的任务服务器。
任务服务器TaskTrackerWorker任劳任怨的工蜂,负责执行具体的任务。
任务TaskTask每一个作业,都需要拆分开了,交由多个服务器来完成,拆 分出来的执行单位,就称为任务。
备份任务Speculative TaskBuckup Task每一个任务,都有可能执行失败或者缓慢,为了降低为此付 出的代价,系统会未雨绸缪的实现在另外的任务服务器上执行同样一个任务,这就是备份任 务。

               
II. 基本架构
与分布式文件系统类似,Map/Reduce的集群,也由三类服务器构成。其中作业服务器,在 Hadoop中称为Job Tracker,在Google论文中称为Master。前者告诉我们,作业服务器是负责 管理运行在此框架下所有作业的,后者告诉我们,它也是为各个作业分配任务的核心。与 HDFS的主控服务器类似,它也是作为单点存在的,简化了负责的同步流程。具体的负责执行 用户定义操作的,是任务服务器,每一个作业被拆分成很多的任务,包括Map任务和Reduce任 务等,任务是具体执行的基本单元,它们都需要分配到合适任务服务器上去执行,任务服务 器一边执行一边向作业服务器汇报各个任务的状态,以此来帮助作业服务器了解作业执行的 整体情况,分配新的任务等等。。。
除了作业的管理者执行者,还需要有一个任务的提交者,这就是客户端。与分布式文件系 统一样,客户端也不是一个单独的进程,而是一组API,用户需要自定义好自己需要的内容, 经由客户端相关的代码,将作业及其相关内容和配置,提交到作业服务器去,并时刻监控执 行的状况。。。
同作为Hadoop的实现,与HDFS的通信机制相同,Hadoop Map/Reduce也是用了协议接口来 进行服务器间的交流。实现者作为RPC服务器,调用者经由RPC的代理进行调用,如此,完成 大部分的通信,具体服务器的架构,和其中运行的各个协议状况,参见下图。从图中可以看 到,与HDFS相比,相关的协议少了几个,客户端与任务服务器,任务服务器之间,都不再有 直接通信关系。这并不意味着客户端就不需要了解具体任务的执行状况,也不意味着,任务 服务器之间不需要了解别家任务执行的情形,只不过,由于整个集群各机器的联系比HDFS复 杂的多,直接通信过于的难以维系,所以,都统一由作业服务器整理转发。另外,从这幅图 可以看到,任务服务器不是一个人在战斗,它会像孙悟空一样招出一群宝宝帮助其具体执行 任务。这样做的好处,个人觉得,应该有安全性方面的考虑,毕竟,任务的代码是用户提交 的,数据也是用户指定的,这质量自然良莠不齐,万一碰上个搞破坏的,把整个任务服务器 进程搞死了,就因小失大了。因此,放在单独的地盘进行,爱咋咋地,也算是权责明确了。 。。

与分布式文件系统相比,Map/Reduce框架的还有一个特点,就是可定制性强。文件系统中 很多的算法,都是很固定和直观的,不会由于所存储的内容不同而有太多的变化。而作为通 用的计算框架,需要面对的问题则要复杂很多,在各种不同的问题、不同的输入、不同的需 求之间,很难有一种包治百病的药能够一招鲜吃遍天。作为Map/Reduce框架而言,一方面要 尽可能的抽取出公共的一些需求,实现出来。更重要的,是需要提供良好的可扩展机制,满 足用户自定义各种算法的需求。Hadoop是由Java来实现的,因此通过反射来实现自定义的扩 展,显得比较小菜一碟了。在JobConf类中,定义了大量的接口,这基本上是Hadoop Map/Reduce框架所有可定制内容的一次集中展示。在JobConf中,有大量set接口接受一个 Class的参数,通常它都有一个默认实现的类,用户如果不满意,则 可自定义实现。。。
               
III. 计算流程
如果一切都按部就班的进行,那么整个作业的计算流程,应该是作业的提交 -> Map任 务的分配和执行 -> Reduce任务的分配和执行 -> 作业的完成。而在每个任务的执行 中,又包含输入的准备 -> 算法的执行 -> 输出的生成,三个子步骤。沿着这个流程 ,我们可以很快的整理清晰整个Map/Reduce框架下作业的执行。。。
1、作业的提交
一个作业,在提交之前,需要把所有应该配置的东西都配置好,因为一旦提交到了作业服 务器上,就陷入了完全自动化的流程,用户除了观望,最多也就能起一个监督作用,惩治一 些不好好工作的任务。。。
基本上,用户在提交代码阶段,需要做的工作主要是这样的:
首先,书写好所有自定的代码,最起码,需要有Map和Reduce的执行代码。在Hadoop中, Map需要派生自Mapper接口,Reduce需要派生自Reducer接口。这里都是用的泛型,用以支持不同的键值类型。这两个接口都仅有一个方 法,一个是map,一个是reduce,这两个方法都直接受四个参数,前两个是输入的键和值相关 的数据结构,第三个是作为输出相关的数据结构,最后一个,是一个Reporter类的实例,实 现的时候可以利用它来统计一些计数。除了这两个接口,还有大量可以派生的接口,比如分 割的Partitioner接口。。。
然后,需要书写好主函数的代码,其中最主要的内容就是实例化一个JobConf类的对象, 然后调用其丰富的setXXX接口,设定好所需的内容,包括输入输出的文件路径,Map和Reduce 的类,甚至包括读取写入文件所需的格式支持类,等等。。。
最后,调用JobClient的runJob方法,提交此JobConf对象。runJob方法会先行调用到 JobSubmissionProtocol接口所定义的submitJob方法,将此作业,提交给作业服务器。接着 ,runJob开始循环,不停的调用JobSubmissionProtocol的getTaskCompletionEvents方法, 获得TaskCompletionEvent类的对象实例,了解此作业各任务的执行状况。。。
2、Map任务的分配
当一个作业提交到了作业服务器上,作业服务器会生成若干个Map任务,每一个Map任务, 负责将一部分的输入转换成格式与最终格式相同的中间文件。通常一个作业的输入都是基于 分布式文件系统的文件(当然在单机环境下,文件系统单机的也可以...),因为,它可以很 天然的和分布式的计算产生联系。而对于一个Map任务而言,它的输入往往是输入文件的一个 数据块,或者是数据块的一部分,但通常,不跨数据块。因为,一旦跨了数据块,就可能涉 及到多个服务器,带来了不必要的复杂性。。。
当一个作业,从客户端提交到了作业服务器上,作业服务器会生成一个JobInProgress对 象,作为与之对应的标识,用于管理。作业被拆分成若干个Map任务后,会预先挂在作业服务 器上的任务服务器拓扑树。这是依照分布式文件数据块的位置来划分的,比如一个Map任务需 要用某个数据块,这个数据块有三份备份,那么,在这三台服务器上都会挂上此任务,可以 视为是一个预分配。。。
关于任务管理和分配的大部分的真实功能和逻辑的实现,JobInProgress则依托 JobInProgressListener和TaskScheduler的子类。TaskScheduler,顾名思义是用于任务分配 的策略类(为了简化描述,用它代指所有TaskScheduler的子类...)。它会掌握好所有作业 的任务信息,其assignTasks函数,接受一个TaskTrackerStatus作为参数,依照此任务服务 器的状态和现有的任务状况,为其分配新的任务。而为了掌握所有作业相关任务的状况, TaskScheduler会将若干个JobInProgressListener注册到JobTracker中去,当有新的作业到 达、移除或更新的时候,JobTracker会告知给所有的JobInProgressListener,以便它们做出 相应的处理。。。
任务分配是一个重要的环节,所谓任务分配,就是将合适作业的合适任务分配到合适的服 务器上。不难看出,里面蕴含了两个步骤,先是选择作业,然后是在此作业中选择任务。和 所有分配工作一样,任务分配也是一个复杂的活。不良好的任务分配,可能会导致网络流量 增加、某些任务服务器负载过重效率下降,等等。不仅如此,任务分配还是一个无一致模式 的问题,不同的业务背景,可能需要不同的算法才能满足需求。因此,在Hadoop中,有很多 TaskScheduler的子类,像Facebook,Yahoo,都为其贡献出了自家用的算法。在Hadoop中, 默认的任务分配器,是JobQueueTaskScheduler类。它选择作业的基本次序是:Map Clean Up Task(Map任务服务器的清理任务,用于清理相关的过期的文件和环境...) -> Map Setup Task(Map任务服务器的安装任务,负责配置好相关的环境...) -> Map Tasks - > Reduce Clean Up Task -> Reduce Setup Task -> Reduce Tasks。在这个前提 下,具体到Map任务的分配上来。当一个任务服务器工作的游刃有余,期待获得新的任务的时 候,JobQueueTaskScheduler会按照各个作业的优先级,从最高优先级的作业开始分配。每分 配一个,还会为其留出余量,已被不时之需。举一个例子:系统目前有优先级3、2、1的三个 作业,每个作业都有一个可分配的Map任务,一个任务服务器来申请新的任务,它还有能力承 载3个任务的执行,JobQueueTaskScheduler会先从优先级3的作业上取一个任务分配给它,然 后再留出一个1任务的余量。此时,系统只能在将优先级2作业的任务分配给此服务器,而不 能分配优先级1的任务。这样的策略,基本思路就是一切为高优先级的作业服务,优先分配不 说,分配了好保留有余力以备不时之需,如此优待,足以让高优先级的作业喜极而泣,让低 优先级的作业感慨既生瑜何生亮,甚至是活活饿死。。。
确定了从哪个作业提取任务后,具体的分配算法,经过一系列的调用,最后实际是由 JobInProgress的findNewMapTask函数完成的。它的算法很简单,就是尽全力为此服务器非配 且尽可能好的分配任务,也就是说,只要还有可分配的任务,就一定会分给它,而不考虑后 来者。作业服务器会从离它最近的服务器开始,看上面是否还挂着未分配的任务(预分配上 的),从近到远,如果所有的任务都分配了,那么看有没有开启多次执行,如果开启,考虑 把未完成的任务再分配一次(后面有地方详述...)。。。
对于作业服务器来说,把一个任务分配出去了,并不意味着它就彻底解放,可以对此任务 可以不管不顾了。因为任务可以在任务服务器上执行失败,可能执行缓慢,这都需要作业服 务器帮助它们再来一次。因此在Task中,记录有一个TaskAttemptID,对于任务服务器而言, 它们每次跑的,其实都只是一个Attempt而已,Reduce任务只需要采信一个的输出,其他都算 白忙乎了。。。
               
3、Map任务的执行
与HDFS类似,任务服务器是通过心跳消息,向作业服务器汇报此时此刻其上各个任务执行 的状况,并向作业服务器申请新的任务的。具体实现,是TaskTracker调用 InterTrackerProtocol协议的heartbeat方法来做的。这个方法接受一个TaskTrackerStatus 对象作为参数,它描述了此时此任务服务器的状态。当其有余力接受新的任务的时候,它还 会传入acceptNewTasks为true的参数,表示希望作业服务器委以重任。JobTracker接收到相 关的参数后,经过处理,会返回一个HeartbeatResponse对象。这个对象中,定义了一组 TaskTrackerAction,用于指导任务服务器进行下一步的工作。系统中已定义的了一堆其 TaskTrackerAction的子类,有的对携带的参数进行了扩充,有的只是标明了下ID,具体不详 写了,一看便知。。。
当TaskTracker收到的TaskTrackerAction中,包含了LaunchTaskAction,它会开始执行所 分配的新的任务。在TaskTracker中,有一个TaskTracker.TaskLauncher线程(确切的说是两 个,一个等Map任务,一个等Reduce任务),它们在痴痴的守候着新任务的来到。一旦等到了 ,会最终调用到Task的createRunner方法,构造出一个TaskRunner对象,新建一个线程来执 行。对于一个Map任务,它对应的Runner是TaskRunner的子类MapTaskRunner,不过,核心部 分都在TaskRunner的实现内。TaskRunner会先将所需的文件全部下载并拆包好,并记录到一 个全局缓存中,这是一个全局的目录,可以供所有此作业的所有任务使用。它会用一些软链 接,将一些文件名链接到这个缓存中来。然后,根据不同的参数,配置出一个JVM执行的环境 ,这个环境与JvmEnv类的对象对应。
接着,TaskRunner会调用JvmManager的launchJvm方法,提交给JvmManager处理。 JvmManager用于管理该TaskTracker上所有运行的Task子进程。在目前的实现中,尝试的是池 化的方式。有若干个固定的槽,如果槽没有满,那么就启动新的子进程,否则,就寻找idle 的进程,如果是同Job的直接放进去,否则杀死这个进程,用一个新的进程代替。每一个进程 都是由JvmRunner来管理的,它也是位于单独线程中的。但是从实现上看,这个机制好像没有 部署开,子进程是死循环等待,而不会阻塞在父进程的相关线程上,父线程的变量一直都没 有个调整,一旦分配,始终都处在繁忙的状况了。
真实的执行载体,是Child,它包含一个main函数,进程执行,会将相关参数传进来,它 会拆解这些参数,并且构造出相关的Task实例,调用其run函数进行执行。每一个子进程,可 以执行指定个数量的Task,这就是上面所说的池化的配置。但是,这套机制在我看来,并没 有运行起来,每个进程其实都没有机会不死而执行新的任务,只是傻傻的等待进程池满,而 被一刀毙命。也许是我老眼昏花,没看出其中实现的端倪。。。
4、Reduce任务的分配与执行
比之Map任务,Reduce的分配及其简单,基本上是所有Map任务完成了,有空闲的任务服务 器,来了就给分配一个Job任务。因为Map任务的结果星罗棋布,且变化多端,真要搞一个全 局优化的算法,绝对是得不偿失。而Reduce任务的执行进程的构造和分配流程,与Map基本完 全的一致,没有啥可说的了。。。
但其实,Reduce任务与Map任务的最大不同,是Map任务的文件都在本地隔着,而Reduce任 务需要到处采集。这个流程是作业服务器经由此Reduce任务所处的任务服务器,告诉Reduce 任务正在执行的进程,它需要的Map任务执行过的服务器地址,此Reduce任务服务器会于原 Map任务服务器联系(当然本地就免了...),通过FTP服务,下载过来。这个隐含的直接数据 联系,就是执行Reduce任务与执行Map任务最大的不同了。。。
5、作业的完成
当所有Reduce任务都完成了,所需数据都写到了分布式文件系统上,整个作业才正式完成 了。此中,涉及到很多的类,很多的文件,很多的服务器,所以说起来很费劲,话说,一图 解千语,说了那么多,我还是画两幅图,彻底表达一下吧。。。
首先,是一个时序图。它模拟了一个由3个Map任务和1个Reduce任务构成的作业执行流程 。我们可以看到,在执行的过程中,只要有人太慢,或者失败,就会增加一次尝试,以此换 取最快的执行总时间。一旦所有Map任务完成,Reduce开始运作(其实,不一定要这样的... ),对于每一个Map任务来说,只有执行到Reduce任务把它上面的数据下载完成,才算成功, 否则,都是失败,需要重新进行尝试。。。

而第二副图,不是我画的,就不转载了,参见这里,它描述了整个Map/Reduce的服务器状 况图,包括整体流程、所处服务器进程、输入输出等,看清楚这幅图,对Map/Reduce的基本 流程应该能完全跑通了。有这几点,可能图中描述的不够清晰需要提及一下,一个是在HDFS 中,其实还有日志文件,图中没有标明;另一个是步骤5,其实是由TaskTracker主动去拉取 而不是JobTracker推送过来的;还有步骤8和步骤11,创建出来的MapTask和ReduceTask,在 Hadoop中都是运行在独立的进程上的。。。
               
IV. Map任务详请
从上面,可以了解到整个Map和Reduce任务的整体流程,而后面要啰嗦的,是具体执行中 的细节。Map任务的输入,是分布式文件系统上的,包含键值对信息的文件。为了给每一个 Map任务指定输入,我们需要掌握文件格式把它分切成块,并从每一块中分离出键值信息。在 HDFS中,输入的文件格式,是由InputFormat类来表示的,在JobConf中,它的 默认值是TextInputFormat类(见getInputFormat),此类是特化的 FileInputFormat子类,而FileInputFormat正是 InputFormat的子类。通过这样的关系我们可以很容易的理解,默认的文件格式 是文本文件,且键是LongWritable类型(整形数),值是Text类型(字符串)。仅仅知道文 件类型是不够的,我们还需要将文件中的每一条数据,分离成键值对,这个工作,是 RecordReader来做的。在TextInputFormat的getRecordReader方法中我们可以 看到,与TextInputFormat默认配套使用的,是LineRecordReader类,是特化的 RecordReader的子类,它将每一行作为一个记录,起始的位置 作为键,整行的字符串作为值。有了格式,分出了键值,还需要切开分给每一个Map任务。每 一个Map任务的输入用InputSplit接口表示,对于一个文件输入而言,其实现是FileSplit, 它包含着文件名、起始位置、长度和存储它的一组服务器地址。。。
当Map任务拿到所属的InputSplit后,就开始一条条读取记录,并调用用于定义的Mapper ,进行计算(参见MapRunner和MapTask的run方法),然后,输出。 MapTask会传递给Mapper一个OutputCollector对象,作为输出的数据结构。它 定义了一个collect的函数,接受一个键值对。在MapTask中,定义了两个OutputCollector的 子类,一个是MapTask.DirectMapOutputCollector,人如其名,它的实现确实 很Direct,直截了当。它会利用一个RecordWriter对象,collect一调用,就直 接调用RecordWriter的write方法,写入本地的文件中去。如果觉着 RecordWriter出现的很突兀,那么看看上一段提到的RecordReader,基本上,数据结构都是对应着的,一个是输入一个是输出。输出很对称也是由 RecordWriter和OutputFormat来协同完成的,其默认实现是 LineRecordWriter和TextOutputFormat,多么的眼熟啊。。。
除了这个非常直接的实现之外,MapTask中还有一个复杂的多的实现,是 MapTask.MapOutputBuffer。有道是简单压倒 一切,那为什么有很简单的实现,要琢磨一个复杂的呢。原因在于,看上去很美的往往带着 刺,简单的输出实现,每调用一次collect就写一次文件,频繁的硬盘操作很有可能导致此方 案的低效。为了解决这个问题,这就有了这个复杂版本,它先开好一段内存做缓存,然后制 定一个比例做阈值,开一个线程监控此缓存。collect来的内容,先写到缓存中,当监控线程 发现缓存的内容比例超过阈值,挂起所有写入操作,建一个新的文件,把缓存的内容批量刷 到此文件中去,清空缓存,重新开放,接受继续collect。。。
为什么说是刷到文件中去呢。因为这不是一个简单的照本宣科简单复制的过程,在写入之 前,会先将缓存中的内存,经过排序、合并器(Combiner)统计之后,才会写入。如果你觉 得Combiner这个名词听着太陌生,那么考虑一下Reducer,Combiner也就是一个Reducer类, 通过JobConf的setCombinerClass进行设置,在常用的配置中,Combiner往往就是用用户为 Reduce任务定义的那个Reducer子类。只不过,Combiner只是服务的范围更小一些而已,它在 Map任务执行的服务器本地,依照Map处理过的那一小部分数据,先做一次Reduce操作,这样 ,可以压缩需要传输内容的大小,提高速度。每一次刷缓存,都会开一个新的文件,等此任 务所有的输入都处理完成后,就有了若干个有序的、经过合并的输出文件。系统会将这些文 件搞在一起,再做一个多路的归并外排,同时使用合并器进行合并,最终,得到了唯一的、 有序的、经过合并的中间文件(注:文件数量等同于分类数量,在不考虑分类的时候,简单 的视为一个...)。它,就是Reduce任务梦寐以求的输入文件。。。
除了做合并,复杂版本的OutputCollector,还具有分类的功能。分类,是通过 Partitioner来定义的,默认实现是HashPartitioner,作业 提交者可以通过JobConf的setPartitionerClass来自定义。分类的含义是什么呢,简单的说 ,就是将Map任务的输出,划分到若干个文件中(通常与Reduce任务数目相等),使得每一个 Reduce任务,可以处理某一类文件。这样的好处是大大的,举一个例子说明一下。比如有一 个作业是进行单词统计的,其Map任务的中间结果应该是以单词为键,以单词数量为值的文件 。如果这时候只有一个Reduce任务,那还好说,从全部的Map任务那里收集文件过来,分别统 计得到最后的输出文件就好。但是,如果单Reduce任务无法承载此负载或效率太低,就需要 多个Reduce任务并行执行。此时,再沿用之前的模式就有了问题。每个Reduce任务从一部分 Map任务那里获得输入文件,但最终的输出结果并不正确,因为同一个单词可能在不同的 Reduce任务那里都有统计,需要想方法把它们统计在一起才能获得最后结果,这样就没有将 Map/Reduce的作用完全发挥出来。这时候,就需要用到分类。如果此时有两个Reduce任务, 那么将输出分成两类,一类存放字母表排序较高的单词,一类存放字母表排序低的单词,每 一个Reduce任务从所有的Map任务那里获取一类的中间文件,得到自己的输出结果。最终的结 果,只需要把各个Reduce任务输出的,拼接在一起就可以了。本质上,这就是将Reduce任务 的输入,由垂直分割,变成了水平分割。Partitioner的作用,正是接受一个键值,返回一个 分类的序号。它会在从缓存刷到文件之前做这个工作,其实只是多了一个文件名的选择而已 ,别的逻辑都不需要变化。。。
除了缓存、合并、分类等附加工作之外,复杂版本的OutputCollector还支持错误数据的 跳过功能,在后面分布式将排错的时候,还会提及,标记一下,按下不表。。。
               
V. Reduce任务详情
理论上看,Reduce任务的整个执行流程要比Map任务更为的罗嗦一些,因为,它需要收集 输入文件,然后才能进行处理。Reduce任务,主要有这么三个步骤:Copy、Sort、Reduce( 参见ReduceTask的run方法)。所谓Copy,就是从执行各个Map任务的服务器那里,收罗到本 地来。拷贝的任务,是由ReduceTask.ReduceCopier类来负责,它有一个内嵌类,叫 MapOutputCopier,它会在一个单独的线程内,负责某个Map任务服务器上文件的拷贝工作。 远程拷贝过来的内容(当然也可以是本地了...),作为MapOutput对象存在,它可以在内存 中也可以序列化在磁盘上,这个根据内存使用状况来自动调节。整个拷贝过程是一个动态的 过程,也就是说它不是一次给好所有输入信息就不再变化了。它会不停的调用 TaskUmbilicalProtocol协议的getMapCompletionEvents方法,向其父TaskTracker询问此作 业个Map任务的完成状况(TaskTracker要向JobTracker询问后再转告给它...)。当获取到相 关Map任务执行服务器的信息后,都会有一个线程开启,做具体的拷贝工作。同时,还有一个 内存Merger线程和一个文件Merger线程在同步工作,它们将新鲜下载过来的文件(可能在内 存中,简单的统称为文件...),做着归并排序,以此,节约时间,降低输入文件的数量,为 后续的排序工作减负。。。
Sort,排序工作,就相当于上述排序工作的一个延续。它会在所有的文件都拷贝完毕后进 行,因为虽然同步有做着归并的工作,但可能留着尾巴,没做彻底。经过这一个流程,该彻 底的都彻底了,一个崭新的、合并了所有所需Map任务输出文件的新文件,诞生了。而那些千 行万苦从其他各个服务器网罗过来的Map任务输出文件,很快的结束了它们的历史使命,被扫 地出门一扫而光,全部删除了。。。
所谓好戏在后头,Reduce任务的最后一个阶段,正是Reduce本身。它也会准备一个 OutputCollector收集输出,与MapTask不同,这个OutputCollector更为简单,仅仅是打开一 个RecordWriter,collect一次,write一次。最大的不同在于,这次传入RecordWriter的文 件系统,基本都是分布式文件系统,或者说是HDFS。而在输入方面,ReduceTask会从JobConf 那里调用一堆getMapOutputKeyClass、getMapOutputValueClass、getOutputKeyComparator 等等之类的自定义类,构造出Reducer所需的键类型,和值的迭代类型Iterator(一个键到了 这里一般是对应一组值)。具体实现颇为拐弯抹角,建议看一下Merger.MergeQueue, RawKeyValueIterator,ReduceTask.ReduceValuesIterator等等之类的实现。有了输入,有 了输出,不断循环调用自定义的Reducer,最终,Reduce阶段完成。。。
VI. 分布式支持
1、服务器正确性保证
Hadoop Map/Reduce服务器状况和HDFS很类似,由此可知,救死扶伤的方法也是大同小异 。废话不多说了,直接切正题。同作为客户端,Map/Reduce的客户端只是将作业提交,就开 始搬个板凳看戏,没有占茅坑的行动。因此,一旦它挂了,也就挂了,不伤大雅。而任务服 务器,也需要随时与作业服务器保持心跳联系,一旦有了问题,作业服务器可以将其上运行 的任务,移交给它人完成。作业服务器,作为一个单点,非常类似的是利用还原点(等同于 HDFS的镜像)和历史记录(等同于HDFS的日志),来进行恢复。其上,需要持久化用于恢复 的内容,包含作业状况、任务状况、各个任务尝试的工作状况等。有了这些内容,再加上任 务服务器的动态注册,就算挪了个窝,还是很容易恢复的。JobHistory是历史记录相关的一 个静态类,本来,它也就是一个干写日志活的,只是在Hadoop的实现中,对日志的写入做了 面向对象的封装,同时又大量用到观察者模式做了些嵌入,使得看起来不是那么直观。本质 上,它就是打开若干个日志文件,利用各类接口来往里面写内容。只不过,这些日志,会放 在分布式文件系统中,就不需要像HDFS那样,来一个SecondXXX随时候命,由此可见,有巨人 在脚下踩着,真好。JobTracker.RecoveryManager类是作业服务器中用于进行恢复相关的事 情,当作业服务器启动的时候,会调用其recover方法,恢复日志文件中的内容。其中步骤, 注释中写的很清楚,请自行查看。。。
2、任务执行的正确和速度
整个作业流程的执行,秉承着木桶原理。执行的最慢的Map任务和Reduce任务,决定了系 统整体执行时间(当然,如果执行时间在整个流程中占比例很小的话,也许就微不足道了... )。因此,尽量加快最慢的任务执行速度,成为提高整体速度关键。所使用的策略,简约而 不简单,就是一个任务多次执行。当所有未执行的任务都分配出去了,并且先富起来的那部 分任务已经完成了,并还有任务服务器孜孜不倦的索取任务的时候,作业服务器会开始炒剩 饭,把那些正在吭哧吭哧在某个服务器上慢慢执行的任务,再把此任务分配到一个新的任务 服务器上,同时执行。两个服务器各尽其力,成王败寇,先结束者的结果将被采纳。这样的 策略,隐含着一个假设,就是我们相信,输入文件的分割算法是公平的,某个任务执行慢, 并不是由于这个任务本身负担太重,而是由于服务器不争气负担太重能力有限或者是即将撒 手西去,给它换个新环境,人挪死树挪活事半功倍。。。
当然,肯定有哽咽的任务,不论是在哪个服务器上,都无法顺利完成。这就说明,此问题 不在于服务器上,而是任务本身天资有缺憾。缺憾在何处?每个作业,功能代码都是一样的 ,别的任务成功了,就是这个任务不成功,很显然,问题出在输入那里。输入中有非法的输 入条目,导致程序无法辨识,只能挥泪惜别。说到这里,解决策略也浮出水面了,三十六计 走位上,惹不起,还是躲得起的。在MapTask中的MapTask.SkippingRecordReader和ReduceTask里的ReduceTask.SkippingReduceValuesIterator,都 是用于干这个事情的。它们的原理很简单,就是在读一条记录前,把当前的位置信息,封装 成SortedRanges.Range对象,经由Task的reportNextRecordRange方法提交到TaskTracker上 去。TaskTracker会把这些内容,搁在TaskStatus对象中,随着心跳消息,汇报到JobTracker 上面。这样,作业服务器就可以随时随刻了解清楚,每个任务正读取在那个位置,一旦出错 ,再次执行的时候,就在分配的任务信息里面添加一组SortedRanges信息。MapTask或 ReduceTask读取的时候,会看一下这些区域,如果当前区域正好处于上述雷区,跳过不读。 如此反复,正可谓,道路曲折,前途光明啊。。。
VII. 总结
对于Map/Reduce而言,真正的困难,在于提高其适应能力,打造一款能够包治百病的执行 框架。Hadoop已经做得很好了,但只有真正搞清楚了整个流程,你才能帮助它做的更好。
分享到: QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏0 支持支持0 反对反对0

使用道具 举报

[至尊红钻3级]发帖数量≥1000篇 [未点亮至尊黄钻]威望不足10点 [未点亮至尊蓝钻]在线时间不足10小时 [未点亮至尊绿钻]贡献度不足10点 [未点亮至尊紫钻]金币不足100个 [未点亮至尊粉钻]精华贴数不足10贴 [未点亮至尊黑钻]活跃不足8个
 等级: 
 级别: 金牌会员
 UID:  2   [未点亮普号显示]钻石不足3个
 阁 分: 1290
 阁 望: 0
 阁 献: 0
 活 跃: 0
 发 贴: 1285 (0)
 阁 币: 5  
性 别: I'm 火星人!
阅读权限: 70
在线时长: 0 小时
注册时间: 2016-10-14
最后登录: 2016-10-18
沙发
发表于 2016-10-18 11:01:12 |只看该作者
本回复帖发表于 2016-10-18 11:01:12,感谢hjghjgj对本帖的认真回复,你的回复是对楼主莫大的鼓舞
路过,学习下

使用道具 举报

[至尊红钻2级]发帖数量≥100篇 [未点亮至尊黄钻]威望不足10点 [未点亮至尊蓝钻]在线时间不足10小时 [未点亮至尊绿钻]贡献度不足10点 [未点亮至尊紫钻]金币不足100个 [未点亮至尊粉钻]精华贴数不足10贴 [未点亮至尊黑钻]活跃不足8个
 等级: 
 级别: 注册会员
 UID:  28   [未点亮普号显示]钻石不足3个
 阁 分: 156
 阁 望: 3
 阁 献: 4
 活 跃: 0
 发 贴: 141 (0)
 阁 币: 5  
性 别: I'm 火星人!
阅读权限: 20
在线时长: 4 小时
注册时间: 2011-1-6
最后登录: 2016-10-21
板凳
发表于 2016-10-19 19:33:31 |只看该作者
本回复帖发表于 2016-10-19 19:33:31,感谢b7823282对本帖的认真回复,你的回复是对楼主莫大的鼓舞
这是什么东东啊

使用道具 举报

[至尊红钻2级]发帖数量≥100篇 [未点亮至尊黄钻]威望不足10点 [未点亮至尊蓝钻]在线时间不足10小时 [未点亮至尊绿钻]贡献度不足10点 [未点亮至尊紫钻]金币不足100个 [未点亮至尊粉钻]精华贴数不足10贴 [未点亮至尊黑钻]活跃不足8个
 等级: 
 级别: 注册会员
 UID:  53   [未点亮普号显示]钻石不足3个
 阁 分: 137
 阁 望: 9
 阁 献: 9
 活 跃: 0
 发 贴: 100 (0)
 阁 币: 10  
性 别: I'm 火星人!
阅读权限: 20
在线时长: 0 小时
注册时间: 2011-1-6
最后登录: 2016-10-21
地板
发表于 2016-10-21 22:39:14 |只看该作者
本回复帖发表于 2016-10-21 22:39:14,感谢bangat对本帖的认真回复,你的回复是对楼主莫大的鼓舞
沙发!沙发!

使用道具 举报

高级模式
B Color Image Link Quote Code Smilies
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

1、请认真发帖,禁止回复纯表情,纯数字等无意义的内容!帖子内容不要太简单!
2、提倡文明上网,净化网络环境!抵制低俗不良违法有害信息。
3、每个贴内连续回复请勿多余3贴,每个版面回复请勿多余10贴!
4、如果你对主帖作者的帖子不屑一顾的话,请勿回帖。谢谢合作!

手机版| 百度搜索:vkee.pw

2012-2015 卓航旗下 GMT+8, 2024-5-18 19:49 , Processed in 0.561356 second(s), 30 queries . Powered by Discuz! X3.2  

禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.如遇版权问题,请及时联系站长(QQ:5213513)

今天是: | 本站已经安全运行: //这个地方可以改颜色

快速回复 返回顶部 返回列表