使用sparksql实现hive表小文件合并

  • Post author:
  • Post category:其他


在使用spark streaming将数据灌入hive时,或者spark批处理时如果分区设得很大,会导致hive表生成很多hdfs小文件,这个问题到目前为止spark都没有比较好的解决方法(不知道spark3.0情况是什么样的)

折中解决:hive表按小时分区,sparkstreaming灌入hive后,在每个小时节点设置一个合并任务,将上一个小时分区数据的小文件进行合并,其实就是将分区数据读出来使用spark重新分区减少分区量,再灌入原分区,则合并后该分区的文件数就是spark计算中的分区数,下面是代码

package com.ops

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.apache.log4j.{Level, Logger}


object MergeHiveFile {
  Logger.getLogger("org").setLevel(Level.ERROR)
  val logger = Logger.getLogger(MergeHiveFile.getClass)
  logger.setLevel(Level.INFO)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
        .master("yarn")
        .appName("MergeHiveFile")
        .config("spark.sql.warehouse.dir", "hdfs://x.x.x.x:8020/user/hive/warehouse")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.debug.maxToStringFields", "100")
        .config("spark.kryoserializer.buffer.max", "128")
        .config("spark.debug.maxToStringFields", "100")
        .enableHiveSupport()
        .getOrCreate()

    val df = new SimpleDateFormat("yyyyMMddHH")
    val currentDT = new Date()
    val calendar = Calendar.getInstance()
    calendar.setTime(currentDT)
    calendar.add(Calendar.HOUR, -1)
    val shouldMergeDT = df.format(new Date(calendar.getTimeInMillis))
    println(shouldMergeDT)
    val shouldMergeDayPartition = shouldMergeDT.substring(0, 8)
    val shouldMergeHourPartition = shouldMergeDT.substring(8, 10)

    spark.sql(
      s"""select
         |userid,
         |lal,
         |ts,
         |fpr,
         |rts,
         |wx,
         |url,
         |min_time,
         |avg_time,
         |max_time,
         |open_cnt,
         |stime,
         |file_time,
         |host,
         |etl_time,
         |diff_time
         |from front_logs.t_pp_nrt
         |where pday = '$shouldMergeDayPartition'
         |and phour = '$shouldMergeHourPartition'""".stripMargin)
      .repartition(6)
      .registerTempTable("tmp_t_pp_nrt")
    spark.sql(s"insert overwrite table front_logs.t_pp_nrt partition(pday = '$shouldMergeDayPartition',phour = '$shouldMergeHourPartition')" +
      s"select * from tmp_t_pp_nrt")
    println(s"合并front_logs.t_po_nrt分区$shouldMergeDayPartition,$shouldMergeHourPartition 完成")

  }
}



版权声明:本文为weixin_42094092原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。