hive小文件造成map多

  • Post author:
  • Post category:其他


问题现象:hive查询时生成了大量的map,损耗了过多的cpu资源,参数调配没有生效

问题分析:

hive的map数 是由设定的inputsplit size来决定,hive封装了hadoop给出了inputformat的接口,用于描述输入数据的格式,并交由hive.input.format参数所决定,其中包含了两种主要使用类型:

1:HiveInputFormat

2:CombineHiveInputFormat

对于combineHiveInputFormat的计算来说,经过的流程如下所述:(重点为标红字段)

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

//加载CombineFileInputFormatShim,这个类继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat

CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()

.getCombineFileInputFormat();

if (combine == null) {

//若为空则采用HiveInputFormat的方式,下同

return super.getSplits(job, numSplits);

}

Path[] paths = combine.getInputPathsShim(job);

for (Path path : paths) {

//若是外部表,则按照HiveInputFormat方式分片

if ((tableDesc != null) && tableDesc.isNonNative()) {

return super.getSplits(job, numSplits);

}

Class inputFormatClass = part.getInputFileFormatClass();

String inputFormatClassName = inputFormatClass.getName();

InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);

if (this.mrwork != null && !this.mrwork.getHadoopSupportsSplittable()) {

if (inputFormat instanceof TextInputFormat) {

if ((new CompressionCodecFactory(job)).getCodec(path) != null)

//在未开启hive.hadoop.supports.splittable.combineinputformat(MAPREDUCE-1597)参数情况下,对于TextInputFormat并且为压缩则采用HiveInputFormat分片算法

return super.getSplits(job, numSplits);

}

}

//对于连接式同上

if (inputFormat instanceof SymlinkTextInputFormat) {

return super.getSplits(job, numSplits);

}

CombineFilter f = null;

boolean done = false;

Path filterPath = path;

//由参数hive.mapper.cannot.span.multiple.partitions控制,默认false;如果没true,则对每一个partition创建一个pool,以下省略为true的处理;对于同一个表的同一个文件格式的split创建一个pool为combine做准备;

if (!mrwork.isMapperCannotSpanPartns()) {

opList = HiveFileFormatUtils.doGetWorksFromPath(

pathToAliases, aliasToWork, filterPath);

f = poolMap.get(new CombinePathInputFormat(opList, inputFormatClassName));

}

if (!done) {

if (f == null) {

f = new CombineFilter(filterPath);

combine.createPool(job, f);

} else {

f.addPath(filterPath);

}

}

}

if (!mrwork.isMapperCannotSpanPartns()) {

//到这里才调用combine的分片算法,继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat extends 新版本CombineFileInputformat

iss = Arrays.asList(combine.getSplits(job, 1));

}

//对于sample查询特殊处理

if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) {

iss = sampleSplits(iss);

}

//封装结果返回

for (InputSplitShim is : iss) {

CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is);

result.add(csplit);

}

return result.toArray(new CombineHiveInputSplit[result.size()]);

}

为TextInputFormat,并非实际存储的压缩格式,因为缺少相关参数设定造成了配置失效,开始走hiveinputformat计算 。

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

//扫描每一个分区

for (Path dir : dirs) {

PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);

//获取分区的输入格式

Class inputFormatClass = part.getInputFileFormatClass();

InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);

//按照相应格式的分片算法获取分片

//注意:这里的Inputformat只是old version API:org.apache.hadoop.mapred而不是org.apache.hadoop.mapreduce,因此不能采用新的API,否则在查询时会报异常:Input format must implement InputFormat.区别就是新的API的计算inputsplit size(Math.max(minSize, Math.min(maxSize, blockSize))和老的(Math.max(minSize, Math.min(goalSize, blockSize)))不一样;

InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);

for (InputSplit is : iss) {

//封装结果,返回

result.add(new HiveInputSplit(is, inputFormatClass.getName()));

}

}

return result.toArray(new HiveInputSplit[result.size()]);

}

解决方案:

设定hive.hadoop.supports.splittable.combineinputformat参数

相关全部参数:

set hive.merge.mapfiles=true;

set hive.hadoop.supports.splittable.combineinputformat=true;

set hive.merge.size.per.task=2147483648;

set hive.merge.smallfiles.avgsize=2147483648;

set hive.merge.size.smallfiles.avgsize=2147483648;

set mapreduce.input.fileinputformat.split.maxsize=2147483648;

set mapred.max.split.size=2147483648;

set mapred.min.split.size.per.node=2147483648;

set mapred.min.split.size.per.rack=2147483648;

set hive.exec.reducers.bytes.per.reducer=2147483648;

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

验证结果:

map数由11236降低至2363个map,证明参数修改生效。

注:设定过程中需要注意调配输入数据量大小,防止单个map输入数据量过多造成运行缓慢。相关数值需合理调配



版权声明:本文为u010824946原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。