大家好,我是后来。
Hive 作为大数据中数仓的重要框架,从速度贼慢的MR引擎,再到Tez,到如今的Spark,速度一直在提升。虽然一条Hive SQL会转换成Spark的几个job,以及会生成多少Stage,我们还不好判断,
但是Spark如何读取Hive表后会有多少个Task呢?
我们知道
Spark的Task数由partitions决定
,那么又如何决定呢?
- Hive在读取不可切片文件的时候只能由单个节点来读入所有数据,即使自己手动设置分区都不行
- 如果Hive表的每个分区的文件都是几M的可切片的小文件,那么spark在读取的时候,每个Task只处理这么小的文件不仅浪费资源还浪费时间,如何优化?
那我们从spark读取文件的源码开始分析:
//简单写个读取文件的语句
val words: RDD[String] = sc.textFile("xxxx",3)
我们从textfile()方法进入,
def textFile(
path: String,
//注意看这里的最小分区,如果textfile()方法中传了分区参数的话就会以传入的为准,否则就会使用默认值
//def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
//我们通过源码发现,defaultMinPartitions 是默认并行度和2的最小值,而默认并行度=自己的cpu核数,所以分区最小值一般等于2
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
//把上面的minPartitions传到这个hadoopFile()方法中。
minPartitions).map(pair => pair._2.toString).setName(path)
}
于是我们从hadoopFile()进入
发现读取文件将调用的是HadoopRDD,而这里的
inputFormatClass
是Hive创建时指定的,默认不指定为
org.apache.hadoop.mapred.TextInputFormat
,同时注意这里的参数
minPartitions
,它是我们刚刚上面方法传过来的值。
这次,继续从HadoopRDD进入,然后检索minPartitions,看一下这个参数被哪个方法使用了。
经过检索,发现了以下的方法
//getSplits()获取切片数
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
那么继续从getSplits方法进入,然后通过Ctrl+h找到实现类,我们这里选择FileInputFormat
然后继续检索getSplits,然后找到了下面的这个方法
我们来看看它的源码:
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
Stopwatch sw = new Stopwatch().start();
FileStatus[] files = listStatus(job);
// Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
//注意看这里,文件的总大小,直接除以之前获取到切片数,为2
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
//而上面的minSplitSize通过看源码发现private long minSplitSize = 1;
//从这里得到的minSize 也等于1
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
files是上面扫描的分区目录下的part-*****文件
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//判断文件是否可切割
if (isSplitable(fs, path)) {
// 这里获取的不是文件本身的大小,它的大小从上面的length就可以知道,这里获取的是HDFS文件块(跟文件本身没有关系)的大小
// HDFS文件块的大小由两个参数决定,分别是 dfs.block.size 和 fs.local.block.size
// 在HDFS集群模式下,由 dfs.block.size 决定,对于Hadoop2.0来说,默认值是128MB
// 在HDFS的local模式下,由 fs.local.block.size 决定,默认值是32MB
long blockSize = file.getBlockSize();// 128MB
// 这里计算splitSize,goalSize是textfile()方法中指定路径下的文件总大小,minSize为1
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
//而这里computeSplitSize = Math.max(minSize, Math.min(goalSize, blockSize))
//所以如果文件大小>128M,那么splitSize 就等于128M,否则就等于文件大小
long bytesRemaining = length;
// 如果文件大小大于splitSize,就按照splitSize对它进行分块
// 由此可以看出,这里是为了并行化更好,所以按照splitSize会对文件分的更细,因而split会更多
//SPLIT_SLOP 为1.1,也就是说,如果文件大小是切片大小的1.1倍以下时,也会分到一个切片,而不会分为2个
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
//而当切到最后一个切片<1.1倍时,就会再追加一个切片。
//举个例子,假如文件大小为160M,因为160/128>1.1,所以切了一个之后,还剩32M
//32M/128<1.1,但是32M != 0 ,所以就会为这32M生成一个切片。
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
//这里指的是文件不可分割
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
//在这里就makeSplit = new FileSplit(file, start, length, hosts);
//所以就是1个分区直接读取,所以假如这个文件的大小是500G不可分割的文件,
//那么只能是一个节点去读,只能用Spark的一个Task,容易数据倾斜。
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits.toArray(new FileSplit[splits.size()]);
}
所以如果Hive表分区下如果有200个小文件,大小假如为5M,那么每个小文件就是一个split了,从而对应了一个Spark的partition,所以当有多个小文件分区时,Spark Task的数量也将直线上升,而自指定的partition数对小文件来说并不能解决问题,所以无法改变读取Hive表Task数多的情况。
而如果是大小是500G(远大于128M块大小的不可分割文件)不可分割的文件,那么只能是一个节点去读,只能用Spark的一个Task,容易数据倾斜。
那么再回到刚开始的问题?这两种场景如何优化:
- 根据实际的业务场景,公司的数据量比较大,每天会有若干G的数据,那么再存储时就不要使用不可分割的压缩方式,可以使用Lzo,或者bzip2
- 产生了多个小文件,就要在小文件的源头,也就是思考自己目前的Hive表的分区方式是否合理?到Hive的日志文件的滚动配置是否合理?最后就是要合并小文件。
最后我把源码的追溯过程放在了一张图上,方便大家查看。
扫码关注公众号“后来X大数据”,回复【电子书】,领取超多本pdf 【java及大数据 电子书】