在使用spark streaming将数据灌入hive时,或者spark批处理时如果分区设得很大,会导致hive表生成很多hdfs小文件,这个问题到目前为止spark都没有比较好的解决方法(不知道spark3.0情况是什么样的)
折中解决:hive表按小时分区,sparkstreaming灌入hive后,在每个小时节点设置一个合并任务,将上一个小时分区数据的小文件进行合并,其实就是将分区数据读出来使用spark重新分区减少分区量,再灌入原分区,则合并后该分区的文件数就是spark计算中的分区数,下面是代码
package com.ops
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.apache.log4j.{Level, Logger}
object MergeHiveFile {
Logger.getLogger("org").setLevel(Level.ERROR)
val logger = Logger.getLogger(MergeHiveFile.getClass)
logger.setLevel(Level.INFO)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("yarn")
.appName("MergeHiveFile")
.config("spark.sql.warehouse.dir", "hdfs://x.x.x.x:8020/user/hive/warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.debug.maxToStringFields", "100")
.config("spark.kryoserializer.buffer.max", "128")
.config("spark.debug.maxToStringFields", "100")
.enableHiveSupport()
.getOrCreate()
val df = new SimpleDateFormat("yyyyMMddHH")
val currentDT = new Date()
val calendar = Calendar.getInstance()
calendar.setTime(currentDT)
calendar.add(Calendar.HOUR, -1)
val shouldMergeDT = df.format(new Date(calendar.getTimeInMillis))
println(shouldMergeDT)
val shouldMergeDayPartition = shouldMergeDT.substring(0, 8)
val shouldMergeHourPartition = shouldMergeDT.substring(8, 10)
spark.sql(
s"""select
|userid,
|lal,
|ts,
|fpr,
|rts,
|wx,
|url,
|min_time,
|avg_time,
|max_time,
|open_cnt,
|stime,
|file_time,
|host,
|etl_time,
|diff_time
|from front_logs.t_pp_nrt
|where pday = '$shouldMergeDayPartition'
|and phour = '$shouldMergeHourPartition'""".stripMargin)
.repartition(6)
.registerTempTable("tmp_t_pp_nrt")
spark.sql(s"insert overwrite table front_logs.t_pp_nrt partition(pday = '$shouldMergeDayPartition',phour = '$shouldMergeHourPartition')" +
s"select * from tmp_t_pp_nrt")
println(s"合并front_logs.t_po_nrt分区$shouldMergeDayPartition,$shouldMergeHourPartition 完成")
}
}
版权声明:本文为weixin_42094092原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。