MapReduce篇
Hadoop解决大规模数据分布式计算的方案是MapReduce。MapReduce既是一个编程模型,又是一个计算框架。也就是说,开发人员必须基于MapReduce编程模型进行编程开发,然后将程序通过MapReduce计算框架分发到Hadoop集群中运行。我们先看一下作为编程模型的MapReduce。
说说MapReduce编程模型
MapReduce是一种非常简单又非常强大的编程模型。
简单在于其编程模型只包含map和reduce两个过程,map的主要输入是一对key,value值,经过map计算后输出一对key,value值;然后将相同key合并,形成key,value集合;再将这个key,value集合输入reduce,经过计算输出零个或多个key,value对。
但是MapReduce同时又是非常强大的,不管是关系代数运算(SQL计算),还是矩阵运算(图计算),大数据领域几乎所有的计算需求都可以通过MapReduce编程来实现。
我们以WordCount程序为例。WordCount主要解决文本处理中的词频统计问题,就是统计文本中每一个单词出现的次数。如果只是统计一篇文章的词频,几十K到几M的数据,那么写一个程序,将数据读入内存,建一个Hash表记录每个词出现的次数就可以了,如下图。
MapReduce作业启动和运行机制
我们以Hadoop1为例,MapReduce运行过程涉及以下几类关键进程:
大数据应用进程:启动用户MapReduce程序的主入口,主要指定Map和Reduce类、输入输出文件路径等,并提交作业给Hadoop集群。
JobTracker进程:根据要处理的输入数据量启动相应数量的map和reduce进程任务,并管理整个作业生命周期的任务调度和监控。JobTracker进程在整个Hadoop集群全局唯一。
TaskTracker进程:负责启动和管理map进程以及reduce进程。因为需要每个数据块都有对应的map函数,TaskTracker进程通常和HDFS的DataNode进程启动在同一个服务器,也就是说,Hadoop集群中绝大多数服务器同时运行DataNode进程和TaskTacker进程。
具体作业启动和计算过程如下:
应用进程将用户作业jar包存储在HDFS中,将来这些jar包会分发给Hadoop集群中的服务器执行MapReduce计算。
应用程序提交job作业给JobTracker。
JobTacker根据作业调度策略创建JobInProcess树,每个作业都会有一个自己的JobInProcess树。
JobInProcess根据输入数据分片数目(通常情况就是数据块的数目)和设置的reduce数目创建相应数量的TaskInProcess。
TaskTracker进程和JobTracker进程进行定时通信。
如果TaskTracker有空闲的计算资源(空闲CPU核),JobTracker就会给他分配任务。分配任务的时候会根据TaskTracker的服务器名字匹配在同一台机器上的数据块计算任务给它,使启动的计算任务正好处理本机上的数据。
TaskRunner收到任务后根据任务类型(map还是reduce),任务参数(作业jar包路径,输入数据文件路径,要处理的数据在文件中的起始位置和偏移量,数据块多个备份的DataNode主机名等)启动相应的map或者reduce进程。
map或者reduce程序启动后,检查本地是否有要执行任务的jar包文件,如果没有,就去HDFS上下载,然后加载map或者reduce代码开始执行。
如果是map进程,从HDFS读取数据(通常要读取的数据块正好存储在本机)。如果是reduce进程,将结果数据写出到HDFS。
3.HDFS中的文件大小设置,以及有什么影响?
HDFS中的文件在物理上是分块存储(block),块的大小可以通过配置参数(dfs.blocksize)来规定,默认大小在hadoop2.x版本中是M,老版本中是64M。
思考:为什么块的大小不能设置的太小,也不能设置的太大?
HDFS的块比磁盘的块大,其目的是为了最小化寻址开销。如果块设置得足够大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。
因而,传输一个由多个块组成的文件的时间取决于磁盘传输速率。
如果寻址时间约为10ms,而传输速率为MB/s,为了使寻址时间仅占传输时间的1%,我们要将块大小设置约为MB。默认的块大小MB。增加文件块大小,需要增加磁盘的传输速率。
secondarynamenode工作机制
1)第一阶段:NameNode启动(1)第一次启动NameNode格式化后,创建fsimage和edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。(2)客户端对元数据进行增删改的请求。(3)NameNode记录操作日志,更新滚动日志。(4)NameNode在内存中对数据进行增删改查。2)第二阶段:SecondaryNameNode工作(1)SecondaryNameNode询问NameNode是否需要checkpoint。直接带回NameNode是否检查结果。(2)SecondaryNameNode请求执行checkpoint。(3)NameNode滚动正在写的edits日志。(4)将滚动前的编辑日志和镜像文件拷贝到SecondaryNameNode。(5)SecondaryNameNode加载编辑日志和镜像文件到内存,并合并。(6)生成新的镜像文件fsimage.chkpoint。(7)拷贝fsimage.chkpoint到NameNode。(8)NameNode将fsimage.chkpoint重新命名成fsimage。
NameNode与SecondaryNameNode的区别与联系?
1)区别(1)NameNode负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。(2)SecondaryNameNode主要用于定期合并命名空间镜像和命名空间镜像的编辑日志。2)联系:(1)SecondaryNameNode中保存了一份和namenode一致的镜像文件(fsimage)和编辑日志(edits)。(2)在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode恢复数据。
ZKFailoverController主要职责
1)健康监测:周期性的向它监控的NN发送健康探测命令,从而来确定某个NameNode是否处于健康状态,如果机器宕机,心跳失败,那么zkfc就会标记它处于一个不健康的状态。
2)会话管理:如果NN是健康的,zkfc就会在zookeeper中保持一个打开的会话,如果NameNode同时还是Active状态的,那么zkfc还会在Zookeeper中占有一个类型为短暂类型的znode,当这个NN挂掉时,这个znode将会被删除,然后备用的NN,将会得到这把锁,升级为主NN,同时标记状态为Active。
3)当宕机的NN新启动时,它会再次注册zookeper,发现已经有znode锁了,便会自动变为Standby状态,如此往复循环,保证高可靠,需要注意,目前仅仅支持最多配置2个NN。
4)master选举:如上所述,通过在zookeeper中维持一个短暂类型的znode,来实现抢占式的锁机制,从而判断那个NameNode为Active状态。
7.Hadoop序列化和反序列及自定义bean对象实现序列化
1)序列化和反序列化(1)序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。(2)反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。(3)Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。2)自定义bean对象要想序列化传输步骤及注意事项:(1)必须实现Writable接口(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造(3)重写序列化方法(4)重写反序列化方法(5)注意反序列化的顺序和序列化的顺序完全一致(6)要想把结果显示在文件中,需要重写toString(),且用"\t"分开,方便后续用(7)如果需要将自定义的bean放在key中传输,则还需要实现