Hadoop2.0的核心包括分布式文件管理系统(HDFS)、资源管理和调度框架YARN和分布式计算框架MapReduce.
-
HDFS是一个具有高容错性的文件系统,适合部署在廉价的机器上,并且能够提供高吞吐量的数据访问,非常适合
大规模数据集
上的应用。MapReduce、Spark等大数据 处理框架要处理的数据源大部分都存储再HDFS上,Hive、HBase等框架的数据通常也存储在HDFS上。简而言之,HDFS为大数据存储提供了保障。 -
YARN解决了Hadoop1.0资源利用率低和不能兼容异构计算框架等多种问题,
提供了资源隔离方案和双调度器解决方案
,可在YARN上运行MapReduce、Spark、Storm、Tez等各种不同类型的计算框架。 -
MapReduce是一个分布式、并行处理的编程模型。开发人员可以在不了解分布式系统底层框架设计原理和缺少并行应用开发经验的情况下,就能使用MapReduce计算框架快速轻松地编写出分布式并行程序,完成对大规模数据集的并行计算。
MapReduce利用函数式编程思想,将复杂的、并行域大规模集群上的并行计算高度抽象为Map和Reduce两个函数,其中Map是对可以并行处理的小数据集进行本地计算并输出中间结果,Reduce是对各个Map的输出结果进行汇总计算得到最终结果。
-
Hive是一个基于Hadoop的数据仓管理工具。
Hive可以让不熟悉Hadoop的开发人员直接编写出SQL语句,实现对大规模数据的统计分析操作。此外,
Hive还可以将SQL语句转换成MapReduce作业,并提交到Hadoop集群上运行
,Hive大大降低了学习门槛,同时也提升了开发效率。
一、HDFS
HDFS采用主从架构,主要有NameNode和DataNode组成。
NameNode作为管理节点,主要存储每个文件的块信息,并控制数据的读写过程,而DataNode作为数据节点,主要用于存储真实数据。
HDFS通过FsIMage和EditLog两个主要元数据文件来管理整个文件系统,理解这些文件的作用才能掌握HDFS的文件存储机制。
HDFS中的数据以文件块Block的形式存储。Block是最基本的存储单位,每次读写的最小单元是一个Block。HDFS采用多副本方式对数据进行冗余存处,通常一个数据块的多个副本会被分不到不同的DataNode上。
作为分布式文件存储系统,HDFS设计和实现了多种机制来保证可靠性,即系统出错时尽可能保证数据不丢失或损坏。除了基本的元数据备份,HDFS还提供了其他多种技术和方法来提高文件系统的可靠性,例如,建立Secondary NameNode和NameNode协同工作机制;创建NameNode的完整备份Backup Node,以便在NameNode故障时进行切换;使用HDFS NameNode HA机制解决NameNode机制解决单点故障问题;使用HDFS Federation联邦机制实现集群扩散性和良好隔离性;使用HDFS Snapshots快照机制来防止用户误操作、备份、灾难恢复。
二、MapReduce
MapReduce是Hadoop生态中的一款分布式计算框架,它提供了非常完善的分布式框架,可以让不熟悉分布式计算的人员也能编写出优秀的分布式系统。MapReduce就是“任务的分解与结果的汇总”。采用“
分而治之
”的思想,可以先将一个大型任务拆分成若干个简单的子任务,然后将每个子任务交给一个独立的结点去处理(Map阶段)。当所有结点的子任务都处理完毕后,再汇总所有子任务的处理结果(Reduce阶段),从而形成最终的结果。
“拆分”任务的过程称为Map阶段,“汇总”任务的过程称为Reduce阶段。
MapReduce1.0采用的是经典的Master/Slave结构,Master表现为JobTracker进程,而Slaver表现为TaskTracker. 由于存在种种问题,研究人员对MapReduce体系架构进行了重新设计,生成了MapReduce2.0和YARN.
MapReduce作业的执行流程主要分为InputFormat、Map、Shuffle、Reduce、OutputFormat五个阶段。其中,Map阶段的业务代码需要继承自org.apache.hadoop.mapreduce.Mapper类;Reduce阶段的业务代码需要继承自org.apache.hadoop.mapreduce.Reduce类;Shuffle阶段是MapReduce的心脏,关乎整个框架性能,可以对Map的输出进行一定的排序(Sort)、分区(Partition)、合并(Combine)等操作,得到<key, List(value)>形式的中间结果,再交给Reduce进行处理。接下来对MapReduce作业各个
执行阶段
进行比较详细的说明。
2.1 MapReduce工作流程
(1) InputFormat:InputFormat模块首先
对输入数据做预处理
,比如验证输入格式是否符合输入定义;然后
将输入文件切分
为逻辑上的多个InputSplit(InputSplit是MapReduce对文件进行处理和运算的输入单位,并没有对文件进行实际切割);由于InputSplit是逻辑切分而非物理切分,所以还需要通过RecordReader根据InputSplit中的信息来处理InputSplit中的具体记录,加载数据并转换为适合Map任务读取的键值对<key, value>,输入给Map任务。
(2) Map:Map模块会根据
用户自定义的映射规则
,输出一系列的<key, value>作为
中间结果
。
(3) Shuffle:为了让Reduce可以并行处理Map的结果,需要
对Map的输出进行一定的排序、分区、合并、归并等操作
,得到<key, value>形式的中间结果,再交给对应的Reduce进行处理。这个过程叫做Shuffle。
(4) Reduce :Reduce以一些列的<key, List(value)>中间结果作为输入,
执行用户定义的逻辑
,输出<key, value>形式的结果给OutputFormat.
(5) OutputFormat:OutputFormat模块会
验证
输入目录是否已经存在以及输出结果类型是否符合配置文件中的配置类型,如果都满足,就将Reduce的结果输出到分布式文件系统。
需要注意的是,用MapReduce来处理的数据集必须具备这样的特点:待处理的数据集可以分解成许多个小的数据集,而且每个小的数据集都可以完全并行地进行处理。这也是MapReduce的局限。
MapReduce是集群运算,在网络中传输的数据必须是可序列化的数据类型,不同于JAVA内置数据类型。MapReduce在执行中会遵循一系列的默认规则,例如默认以字典顺序对数据进行排序,根据默认规则进行分区等。我们也可以对这些默认规则进行自定义设置,从而以自定义组件的形式运行MapReduce程序。
目前,比较常见的分布式计算框架除了Apache Hadoop MapReduce之外,还有Apache Spark、Apache Storm、Apache Flink等。
2.2 MapReduce调优
在MapReduce执行期间,可能会出现运行速度太慢等性能较低的情况。以下是造成性能较低的一些常见原因以及相应的结果方案。
(1) 输入数据中存在大量小文件
问题:MapReduce默认使用的输入类TextInputformat会将每一个小文件作为一个独立的文件切片,并且会将每个文件切片交给一个maptask处理。因此,大量的小文件就会导致MapReduce产生大量的maptask,从而导致MapReduce的整体效率低下。
解决:可以在maptask处理之前将小文件进行合并,然后将合并后的文件进行处理。合并小文件可以使用程序语言、软件工具进行合并,也可以使用MapReduce提供的CombineFileInputFormat或自定义MapReduce的执行方式。
(2) 减少MapReduce各阶段数据传输的次数
问题:默认情况下,数据会从Map结点通过网络传输到Reduce节点。但如果Map结点存在大量数据,就会造成大量数据需要经由网络传输的后果。
解决:我们可以先将各个Map结点的数据在本地处理,然后再将各个Map节点本地处理的结果经由网络传输到Reduce进行汇总。(但要注意,并不是所有的业务逻辑都适合现在Map阶段处理)
(3) 数据压缩
问题:除了减少数据在网络的传输次数以外,还可以减少每次传输的数据容量。
解决:可以在Map端开启压缩功能并设置压缩方式,然后再在Reduce端开始解压缩并设置解压缩方式。注意,压缩和解压缩的方式必须保持一致。
(4) 避免数据倾斜
问题:如果某个任务在经过Shuffle处理后,将大量数据集中在一个Reduce上,就会造成该Reduce非常繁忙、而其他Reduce又过于空闲的情况。这种任务不均衡的情况也会拖慢整个MapReduce的执行周期。
解决:方法很多,可以使用抽样统计,自定义Combine组件、将Reduce Join改成Map Join等方式。
(5) 参数调优
在搭建MapReduce环境时,需要配置很多配置文件,这些配置文件中可以设置很多MapReduce运行参数。因此可以通过调整这些参数值来改变MapReduce的整体设置,从而改变MapReduce在运行时的性能情况。
三、YARN
YARN包括ResourceManager、ApplicationMaster和NodeManager,其中
ResourceManager负责资源管理,ApplicationMaster负责任务调度和任务监控,NodeManager负责定期向JobTracker汇报本节点的健康状况、资源使用情况、任务执行情况和接受来自JobTracker的命令并执行
。且原资环被划分的Slot重新设计为
容器Container
,NodeManager能够启动和监控容器Container。
3.1 YARN的体系架构
YARN采用主从架构(Master/Slave),其核心组件包括ResourceManager、NodeManager和ApplicationMaster三个。其中ResourceManager是主进程,NodeManager是从进程,一个ResourceManager对应多个NodeManager,每个应用程序拥有一个ApplicationMaster。此外,YARN中引入了一个逻辑概念——容器(Container),它将各类资源(如CPU、内存)抽象化,方便从节点NodeManager管理本地资源。YARN各组成部分的功能介绍如下:
(1) Client:负责向ResourceManager提交任务、终止任务等。
(2) ResourceManager:一个集群只有一个,负责集群资源的统一管理和调度,具体功能包括:(a) 处理来自客户端的请求,包括启动和终止应用程序;(b) 启动/监控ApplicationMaster。一旦某个ApplicationMaster出现故障,ResourceManager会在另一个节点上启动该ApplicationMaster;(c) 监控NodeManager,接收NodeManager汇报的心跳信息并分配任务给NodeManager去执行。一旦某个NodeManager出现故障,标记该NodeManager的任务,并告诉对应的ApplicationMaster如何处理。
(3) NodeManager:整个集群有很多各NodeManager,负责单节点资源的管理和使用,具体承担功能包括:(a) 周期性向ResourceManager汇报本节点上的资源使用情况和各个Container的运行状态;(b) 接收并处理来自ResourceManager的Container启动/停止的各种命令;(c) 出来来自ApplicationMaster的命令。
(4) ApplicationMaster:每个应用程序拥有一个ApplicationMaster,负责管理应用程序,具体承担的功能包括:(a) 数据切分;(b) 为应用程序/作业向ResourceManager申请资源(Container),并分配给内部任务;(c) 与NodeManager通信,以启动/停止任务;(d) 任务监控和容错,在任务执行失败时重新为该任务申请资源并重启任务;(e) 接收并处理ResourceManager发出的命令,入终止Container、重启NodeManager等。
(5) Container:由ApplicationMaster向ResourceManager申请,由后者中的资源调度器异步分配给ApplicationMaster。另外,一个应用程序所需的Container分为以下两大类:(a) 运行ApplicationMaster的Container。这是由ResourceManager和其内部的资源调度器申请和启动的。用户提交应用程序时,可指定唯一的ApplicationMaster所需的资源;(b) 运行各类任务的Container。这是由ApplicationMaster向ResourceManager申请的,并由ApplicationMaster与NodeManager通信以启动。该类Container上运行的任务类型可以是Map Task、Reduce Task或Spark Task等。以上两类Container可能在任意节点上,它们的位置通常而言是随机的,即ApplicationMaster可能与它管理的任务运行在一个节点上。
3.2 YARN的工作流程
在YARN框架中执行一个MapReduce应用程序时,从提交到完成需要经历的步骤如下:
(1) Client向YARN提交MapReduce应用程序,提交的内容包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
(2) ResourceManager接收到Client应用程序请求后,为应用程序分配第一个Container,并于对应的NodeManager通信,要求它在这个Container中启动该应用程序的ApplicationMaster。
(3) ApplicationMaster被创建后会首先向ResourceManager注册,从而使得用户可以直接通过ResourceManager查询应用程序得运行状态。接下来得(4)-(7)是具体应用程序得执行步骤。
(4) ApplicationMaster采用轮询得方式通过RPC请求向ResourceManager申请资源。
(5) ResourceManager以“容器Container”的形式向提出申请ApplicationMaster分类资源。一旦ApplicationMaster申请到资源,便与对应的NodeManager通信,要求它启动任务。
(6) 当ApplicationMaster要求容器启动任务时,它会为任务设置好运行环境,然后将任务启动命令写到一个脚本中,最后NodeManager在容器中运行该脚本以启动任务。
(7) 各个任务通过RPC协议向ApplicationMaster汇报自己的状态和进度,以便ApplicationMaster随时掌握各个任务的运行状态,以便在任务失败时重启任务。在应用程序运行过程中,用户可以随时通过RPC向ApplicationMaster查询应用程序当前运行状态。
(8) 应用程序运行完毕后,ApplicationMaster向ResourceManager的应用程序管理器ApplicationManager注销并关闭自己。若ApplicationMaster因故失败,ResourceManager中的应用程序管理器ApplicationManager会检测到失败的情形,然后将其重新启动,直到所有的任务都执行完毕为止。
四、Hive
Hive是建立在Hadoop上的一个开源的数据仓库工具,主要适用于离线分析。Hive定义了简单的类SQL查询语言,称为HiveSQL。
Hive的本质是将HiveQL语句转化成MapReduce程序,并提交到Hadoop集群上运行。
它允许用户编写SQL语句实现对大规模数据的统计分析操作,也与允许熟悉MapReduce的开发者开发自定义的Mapper和Reducer来处理内建Mapper和Reducer无法完成的复杂的分析工作。和传统关系型数据库相比,
Hive具有如下特征:查询语言和SQL接近;并行执行(有些查询没有MR任务);使用HDFS存储;支持多种数据格式;不支持数据更新;不支持索引;执行延迟高;可扩展性高;数据规模大。
- Hive在加载的过程中不会对数据本身进行任何修改,只是将数据内容复制或者移动到相应的HDFS目录中。而在数据库中,不同的数据库有不同的存储引擎,定义了自己的数据格式。所有数据都会按照一定的组织存储,因此,数据库加载数据的过程会比较耗时。
- Hive要访问数据中满足条件的特定值时,需要暴力搜索整个数据,因此访问延迟较高。由于MapReduce的引入,Hive可以并行访问数据,因此即使没有索引,对于大规模数据的访问,Hive仍然可以体现出优势。
-
Hive在查询数据的时候,由于没有索引,需要扫描整个表,因此延迟较高。另外由于MapReduce本身具有较高的延迟,因此在利用其执行Hive查询时,也会有较高的延迟。但是当数据规模超过数据库处理能力的时候,Hive的并行计算优势就能够显现出来了。
此外,Hive不适合在线数据查询。
Hive的体系架构中主要包括如下组件:CLI、JDBC/ODBC、Thrift Serve、HWI、Metastore和Driver。这些组件可以分为客户端组件和服务端组件两类。另外,Hive还需要Hadoop的支持,它使用HDFS进行存储,使用MapReduce进行计算。
4.1 客户端组件
(1) CLI:Hive命令行接口,是最常用的一种用户接口。CLI是和Hive进行交互的最简单也是最常用的方式,只需要在一个具备完整Hive的环境下的Shell终端中键入hive即可启动服务。不过Hive CLI不适应于高并发的生产环境,仅仅是Hive管理员的好工具。
(2) JDBSC/ODBC:定义了一系列JAVA访问各类数据库的访问接口。
(3) HWI:Hive的web访问接口,提供了一种可以通过浏览器来访问Hive服务的功能。
4.2 服务端组件
(1) Thrift Server:Hive集成了Thrift Serve服务,能让JAVA、Python等不同的编程语言接口调用Hive接口。
(2) 元数据:元数据(Metastore)组件用于存储Hive的元数据,包括表名、表所属的数据、表的拥有者、列/分区字段、表的类型(是否为外部表)、表的数据所在目录等。推荐使用MySQL存储Metastore(快速响应数据存取的需求)。
(3) 驱动器:
驱动器组件的作用是将用户编写的HiveQL语句进行解析、编译、优化,胜场执行计划,然后调用底层的MapReduce计算框架。
驱动器分为四个部分:(a)解析器:将SQL字符串转换成抽象语法树AST,对AST进行语法分析;(b)编译器:将AST编译生成逻辑执行计划;(c)优化器:对逻辑执行计划进行优化;(d)执行器:把逻辑执行计划转化成可以运行的物理计划对于Hive来说,就是MapReduce/Spark。
Hive的基本数据类型是可以由低到高进行隐式转换的。但是不会进行反向转换,但某些情况下可以使用CAST函数进行反向转化。此外,Hive支持多种文件格式:TEXTFILE、SEQUENCEFILE、RCFILE、ORC。
4.3 Hive的数据模型
Hive中包含
表、分区和桶
三种数据类型,如图所示,表->分区->桶,对数据的划分粒度越来越小。
Hive中表分为
内部表和外部表
。内部表创建时会把数据存储在默认路径下,当删除一个内部表时,Hive会将数据和元数据全部删掉,同时删除掉这个数据目录。而Hive在创建外部表时需要指定数据读取的目录,外部表仅仅记录数据所在的路径,不对数据的位置做任何改变,当删除表时,外部表指挥删除元数据,数据晚间不会删除。
分区表实际上就是一个对应HDFS文件系统上一个独立文件夹,该文件夹下是该分区所有的数据文件。Hive中的
分区实际上就是分目录
,把一个大的数据集根据业务需要分割成小的数据集。分区的好处是可以让数据按照区域进行分类,避免了查询时的全表扫描。
分桶就是将同一个目录下的
一个文件拆分成多个文件
,每个文件包含数据的一部分,方便获取值,提高检索效率。分区针对的是数据的存储路径,分桶针对的是数据文件。分区提供一个隔离数据和优化查询的便利方式,但并非所有的数据集都可形成合理的分区;分桶是将数据分解成更容易管理的若干部分的另一种技术。
Hive通过
某列的HASH值取模来决定桶的分配
。使用桶的原因有两个:一是方便JOIN连接操作,连接时要求属于同一个连接键的数据在一个分区中。二是分桶使采样过程更加高效,从而降低Hive的查询时间。
4.4 Hive的调优策略
- Map任务的数据不需要单独设置,建议通过设置Block的最小和最大值来改变Map任务的数量。Reducer的个数通常由当前的应用环境决定,需要进行多次测试,已选择最佳的数量。
- 为了减轻网络传输压力,可以使用压缩技术对MapReduce中需要传输的数据进行压缩。通常压缩可以提高磁盘的输入/输出销量,但这也会增加CPU的计算开销,需要权衡。
- 建议开启分布式缓存以保留计算任务的中间结果集,主要是通过设置auto.convert.join=true,以提高表的连接效率。
- 根据具体业务需要,提取和预处理部分表数据,以提高查询计算效率。
- 提高Hive任务的并行性,设置并行参数hive.exec.parllel为true,并设置线程数量hive.exec,parallel.thread.number为CPU的实际线程数量。
- 建议关闭预测执行。当数据分片存在倾斜时,Hive会把执行时间长的任务当作失败,进而再产生一个相同的任务去执行,反而会降低执行效率。
- 设置JAVA虚拟机JVM重用,即允许一个JVM运行多个任务,来节省虚拟机的初始时间。但不要将每个虚拟机运行的任务个数设置太多,否则会降低任务的相应时间。
- 优化JOIN的连接操作。编写带有JOIN的HiveQL语句时,应该将字段少的表或者子查询放在JOIN操作符的左边,因为再规约Reduce阶段,左边的数据会被放入内存,这样能够节约内存空间。对于同一个关键词Key,对应值小的应该放到JOIN前面,大的放到JOIN后面。
以上是菜鸟学习Hadoop的基础笔记,结合Hive有了一个整体大概的了解。