目录
2.3 executor读取hive表时单task处理数据量/无shuffle作业小文件合并
2.4 GC优化(使用较少,当尝试其他调优方法均无效时可尝试此方法)
一、常用参数
-- map端小文件合并
set hive.merge.mapfiles=true;
-- reduce端小文件合并
set hive.merge.mapredfiles=true;
-- 小文件合并的阈值
set spark.sql.mergeSmallFileSize=128000000;
-- 小文件合并的task中,每个task读取的数据量
set spark.sql.targetBytesInPartitionWhenMerge=128000000;
-- 普通task读取的数据量,原来的值是33554432 (33M)
set spark.hadoopRDD.targetBytesInPartition=256000000;
-- 启动参数
set spark.sql.rangePartition.exchangeCoordinator=true;
-- 控制shuffle阶段每个task读取的数据量为256M
--spark2只对最后一个stage进行shuffle分区合并,spark3对中间的stage也生效,在spark2的时候有些同学会依赖spark.sql.adaptive.shuffle.targetPostShuffleInputSize进行小文件合并,在spark3上如果设置的这个参数,影响了中间stage可能会使作业的运行时间变长,我们对小文件的问题有专门的feature,可以设置下面两个参数
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=256000000;
二、正文
本文目标读者为对美团点评数据仓库和
Spark SQL运行原理
有基本了解的数据开发人员。
目前,Spark在美团点评的数仓生产中主要执行Hive表到Hive表的生产任务,尤其对有多轮shuffle的作业有很好的性能提升。美团点评用于ETL生产的Spark集群架设在HDFS、Yarn之上,依靠Hive metastore server进行元数据管理。
Spark完成一个数据生产任务(执行一条SQL)的基本过程如下:
(1)对SQL进行语法分析,生成逻辑执行计划——(2)从Hive metastore server获取表信息,结合逻辑执行计划生成并优化物理执行计划——(3)根据物理执行计划向Yarn申请资源(executor),调度task到executor执行。——(4)从HDFS读取数据,任务执行,任务执行结束后将数据写回HDFS。
本文从作业运行行为、executor处理能力、driver能力三个方面进行介绍。
作业运行行为主要影响(3)阶段中,task被如何调度,有shuffle时Reducer的个数等。
executor处理能力主要影响(3)阶段中task被调度到executor后,executor能否正常完成任务。
driver能力主要影响(2)(3)阶段。在(2)阶段中,如果用户表为ORC表,driver可能读取file footer等信息,会导致driver读取HDFS,如果这部分信息太大,则可能会造成driver存在内存压力。(3)阶段中,driver与Yarn RM交互,申请到executor后进行调度,task执行结束会生成一定执行指标信息和任务执行元数据信息返回给driver,同时每个executor还会和driver有心跳连接,这些都是driver运行的负载。
1 运行行为
1.1 动态生成分区
下列Hive参数对Spark同样起作用。
set hive.exec.dynamic.partition=true; // 是否允许动态生成分区
set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分区全部动态生成
set hive.exec.max.dynamic.partitions = 100; // 动态生成的最多分区数
1.2 broadcast join
当大表JOIN小表时,如果小表足够小,可以将大表分片,分别用小表和每个大表的分片进行JOIN,最后汇总,能够大大提升作业性能。
spark.sql.autoBroadcastJoinThreshold
平台默认值为26214400(25M),如果小表的大小小于该值,则会将小表广播到所有executor中,使JOIN快速完成。如果该值设置太大,则会导致executor内存压力过大,容易出现OOM。
注:ORC格式的表会对数据进行压缩,通常压缩比为2到3左右,但有些表的压缩比就会很高,有时可以达到10。请妥善配置该参数,并配合spark.executor.memory,使作业能够顺利执行。
使用hint强制做broadcastjoin:
有时候,可能会遇到引擎无法识别表大小的情况,可以使用hint强制执行broadcast join,如下所示。Spark可以识别/*+ MAPJOIN(l) */和/*+ BROADCASTJOIN(u) */两种,Hive只能识别/*+ MAPJOIN(l) */,因此建议使用/*+ MAPJOIN(l) */。
代码块
SQL
select /*+ MAPJOIN(l) */ i.a, i.b, l.b
from tmp1 i join tmp2 l ON i.a = l.a;
1.3 动态资源分配
spark.dynamicAllocation.enabled
:是否开启动态资源分配,平台默认开启,同时强烈建议用户不要关闭。理由:开启动态资源分配后,Spark可以根据当前作业的负载动态申请和释放资源。
spark.dynamicAllocation.maxExecutors
: 开启动态资源分配后,同一时刻,最多可申请的executor个数。平台默认设置为1000。当在Spark UI中观察到task较多时,可适当调大此参数,保证task能够并发执行完成,缩短作业执行时间。
下图是一个由于并发不足导致作业执行较慢的一个明显的任务:
打开执行时间较长的stage,查看其任务数为2w+。
点击stage的链接,进入查看stage中的任务,将任务按照Launch Time排序,先有小到大再由大到小。
可以看到任务启动时间差了3个多小时。可以确定该任务是由于
spark.dynamicAllocation.maxExecutors
过小导致的。
该参数可以和spark.executor.cores配合增大作业并发度。s
spark.dynamicAllocation.minExecutors
: 和s,d,maxExecutors相反,此参数限定了某一时刻executor的最小个数。平台默认设置为3,即在任何时刻,作业都会保持至少有3个及以上的executor存活,保证任务可以迅速调度。
1.4 Shuflle相关
spark.sql.shuffle.partitions
: 在有JOIN或聚合等需要shuffle的操作时,从mapper端写出的partition个数,平台默认设置为2000。
如select a, avg(c) from test_table group by a语句,不考虑优化行为,如果一个map端的task中包含有3000个a,根据spark.sql.shuffle.partitions=2000,会将计算结果分成2000份partition(例如按2000取余),写到磁盘,启动2000个reducer,每个reducer从每个mapper端拉取对应索引的partition。
当作业数据较多时,适当调大该值,当作业数据较少时,适当调小以节省资源。
spark.sql.adaptive.enabled
:是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行。平台默认开启,同时强烈建议开启。理由:更好利用单个executor的性能,还能缓解小文件问题。
spark.sql.adaptive.shuffle.targetPostShuffleInputSize
:和spark.sql.adaptive.enabled配合使用,当开启调整partition功能后,当mapper端两个partition的数据合并后数据量小于targetPostShuffleInputSize时,Spark会将两个partition进行合并到一个reducer端进行处理。平台默认为67108864(64M),用户可根据自身作业的情况酌情调整该值。当调大该值时,一个reduce端task处理的数据量变大,最终产出的数据,存到HDFS上的文件也变大。当调小该值时,相反。
代码块
Plain Text
18/01/16 03:18:03 WARN TransportChannelHandler: Exception in connection from rz-data-hdp-dn3938.rz.sankuai.com/10.16.47.49:7337
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 2013265920, max: 2022178816)
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530)
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:484)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:711)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:700)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:221)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:296)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
以及
代码块
Plain Text
18/01/16 18:36:03 WARN TransportChannelHandler: Exception in connection from rz-data-hdp-dn0871.rz.sankuai.com/10.16.57.13:34925
java.lang.IllegalArgumentException: Too large frame: 3608642420
at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
spark.sql.adaptive.minNumPostShufflePartitions
: 当
spark.sql.adaptive.enabled
参数开启后,有时会导致很多分区被合并,为了防止分区过少,可以设置
spark.sql.adaptive.minNumPostShufflePartitions
参数,防止分区过少而影响性能。
1.5 读ORC表优化
spark.hadoop.hive.exec.orc.split.strategy
参数控制在读取ORC表时生成split的策略。BI策略以文件为粒度进行split划分;ETL策略会将文件进行切分,多个stripe组成一个split;HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256 * 1024 * 1024)时使用ETL策略,否则使用BI策略。
对于一些较大的ORC表,可能其footer较大,ETL策略可能会导致其从hdfs拉取大量的数据来切分split,甚至会导致driver端OOM,因此这类表的读取建议使用BI策略。
对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。
另外,
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize
参数可以控制在ORC切分时stripe的合并处理。具体逻辑是,当几个stripe的大小大于spark.hadoop.mapreduce.input.fileinputformat.split.maxsize时,会合并到一个task中处理。可以适当调小该值,如set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=134217728。以此增大读ORC表的并发。
比如我想要以64M大小划分split,可以在XT ETL中设置如下参数:
代码块
SQL
set spark.hadoop.hive.exec.orc.split.strategy=ETL;
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=64000000;
set spark.hadoop.mapreduce.input.fileinputformat.split.minsize=64000000;
-- 避免将拆分的split再合并
set spark.hadoopRDD.targetBytesInPartition=-1;
2 executor能力
2.1内存
spark.executor.memory
:executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存。当executor端由于OOM时,多数是由于spark.executor.memory设置较小引起的。该参数一般可以根据表中单个文件的大小进行估计,但是如果是压缩表如ORC,则需要对文件大小乘以2~3倍,这是由于文件解压后所占空间要增长2~3倍。平台默认设置为2G。
spark.yarn.executor.memoryOverhead
:Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。
Spark根据spark.executor.memory+spark.yarn.executor.memoryOverhead的值向RM申请一个容器,当executor运行时使用的内存超过这个限制时,会被yarn kill掉。在Spark UI中相应失败的task的错误信息为:
代码块
Plain Text
Container killed by YARN for exceeding memory limits. XXX of YYY physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
这个时候,适当调大spark.yarn.executor.memoryOverhead。平台默认设置为1024(1G),注意:该参数的单位为MB。但是,如果用户在代码中无限制的使用堆外内存。调大该参数没有意义。需要用户了解自己的代码在executor中的行为,合理使用堆内堆外内存。
spark.sql.windowExec.buffer.spill.threshold:当用户的SQL中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘。该参数如果设置的过小,会导致spark频繁写磁盘,如果设置过大则一个窗口中的数据全都留在内存,有OOM的风险。但是,为了实现快速读入磁盘的数据,spark在每次读磁盘数据时,会保存一个1M的缓存。
举例:当spark.sql.windowExec.buffer.spill.threshold为10时,如果一个窗口有100条数据,则spark会写9((100 – 10)/10)次磁盘,在读的时候,会创建9个磁盘reader,每个reader会有一个1M的空间做缓存,也就是说会额外增加9M的空间。
当某个窗口中数据特别多时,会导致写磁盘特别频繁,就会占用很大的内存空间作缓存。因此如果观察到executor的日志中存在大量如下内容,则可以考虑适当调大该参数,平台默认该参数为40960。
代码块
Plain Text
pilling data because number of spilledRecords crossed the threshold
2.2 executor并发度
spark.executor.cores
:单个executor上可以同时运行的task数。Spark中的task调度在线程上,该参数决定了一个executor上可以并行执行几个task。这几个task共享同一个executor的内存(spark.executor.memory+spark.yarn.executor.memoryOverhead)。适当提高该参数的值,可以有效增加程序的并发度,是作业执行的更快,但使executor端的日志变得不易阅读,同时增加executor内存压力,容易出现OOM。在作业executor端出现OOM时,如果不能增大spark.executor.memory,可以适当降低该值。平台默认设置为1。
该参数是executor的并发度,和
spark.dynamicAllocation.maxExecutors
配合,可以提高整个作业的并发度。
2.3 executor读取hive表时单task处理数据量/无shuffle作业小文件合并
spark.hadoopRDD.targetBytesInPartition
:该参数是美团点评特有参数,目前还未反馈给社区。Spark在读取hive表时,默认会为每个文件创建一个task,如果一个SQL没有shuffle类型的算子,每个task执行完都会产生一个文件写回HDFS,这样就潜在存在小文件问题。该参数可以将多个文件放到一个task中处理,默认为33554432,即如果一个文件和另一个文件大小之和小于32M,就会被放到一个task钟处理。适当提高该值,可以降低调度压力,避免无shuffle作业产生过多小文件。
2.4 GC优化(使用较少,当尝试其他调优方法均无效时可尝试此方法)
executor的JVM参数传递方式为:set spark.executor.extraJavaOptions=”XXXXXXXXXX “。例如,set spark.executor.extraJavaOptions=”-XX:NewRatio=3 -XX:+UseG1GC”。
注:所有的JVM参数必须写在一起,不能分开。bad case:set spark.executor.extraJavaOptions=”-XX:NewRatio=3 “; set spark.executor.extraJavaOptions=”-XX:+UseG1GC ” ;
打开GC打印:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
full GC 频繁:内存不够用,调大spark.executor.memory,调小spark.executor.cores。
minor GC频繁,而full GC比较少:可以适当提高Eden区大小-Xmn
如果OldGen区快要满了,适当提高spark.executor.memory(平台默认2G)或适当降低spark.memory.fraction(平台默认为0.3)或适当提高-
XX:NewRatio(老年代是年轻代的多少倍,一般默认是2)。
如果spark.executor.memory调的很大且GC仍是程序运行的瓶颈,可以尝试启用G1垃圾回收器(-XX:+UseG1GC)
修改了GC的参数一定要仔细观察GC的频率和时间。
修改方法:set spark.executor.extraJavaOptions=”-XX:NewRatio=3 -XX:+UseG1GC …”
3 driver指标:
3.1 内存
spark.driver.memory
:driver使用内存大小, 平台默认为10G,根据作业的大小可以适当增大或减小此值。
3.2 GC优化
通过set spark.driver.extraJavaOptions=”XXXXXXXXXX “设置,具体设置内容可参考2.4节,一般情况driver内存较大,可尝试启用G1垃圾回收器。