Spark Sql 转换成Task执行 和 InsertIntoHiveTable写入hive表数据 源码分析

  • Post author:
  • Post category:其他




1.3.1 InsertIntoHiveTable类源码解析



1.3.1.1 背景

请添加图片描述

读取数据,经过处理后,最终写入 hive表,这里研究下写入原理。抛出如下几个问题?

1、task处理完数据后,如何将数据放到表的location目录下?

2、这类写入表的task,是如何从spark sql 逻辑计划/物理计划 转化成 task启动的?



1.3.1.2 spark sql 逻辑计划/物理计划 如何转化成 task(回答问题2)

driver端 调试日志如下(

定位InsertIntoHiveTable物理执行算子的run方法

):

run:84, InsertIntoHiveTable (org.apache.spark.sql.hive.execution)
sideEffectResult$lzycompute:108, DataWritingCommandExec (org.apache.spark.sql.execution.command)
sideEffectResult:106, DataWritingCommandExec (org.apache.spark.sql.execution.command)
executeCollect:120, DataWritingCommandExec (org.apache.spark.sql.execution.command)  -- 物理执行计划触发执行
$anonfun$logicalPlan$1:229, Dataset (org.apache.spark.sql)    -----org.apache.spark.sql.Dataset#logicalPlan 
apply:-1, 76482793 (org.apache.spark.sql.Dataset$$Lambda$1656)
$anonfun$withAction$1:3618, Dataset (org.apache.spark.sql)
apply:-1, 277117675 (org.apache.spark.sql.Dataset$$Lambda$1657)
$anonfun$withNewExecutionId$5:100, SQLExecution$ (org.apache.spark.sql.execution)
apply:-1, 1668179857 (org.apache.spark.sql.execution.SQLExecution$$$Lambda$1665)
withSQLConfPropagated:160, SQLExecution$ (org.apache.spark.sql.execution)
$anonfun$withNewExecutionId$1:87, SQLExecution$ (org.apache.spark.sql.execution)
apply:-1, 216687255 (org.apache.spark.sql.execution.SQLExecution$$$Lambda$1658)
withActive:764, SparkSession (org.apache.spark.sql)
withNewExecutionId:64, SQLExecution$ (org.apache.spark.sql.execution)
withAction:3616, Dataset (org.apache.spark.sql)
<init>:229, Dataset (org.apache.spark.sql)
$anonfun$ofRows$2:100, Dataset$ (org.apache.spark.sql)  --- org.apache.spark.sql.Dataset#ofRows
apply:-1, 2116006444 (org.apache.spark.sql.Dataset$$$Lambda$925)
withActive:764, SparkSession (org.apache.spark.sql)
ofRows:97, Dataset$ (org.apache.spark.sql)
$anonfun$sql$1:607, SparkSession (org.apache.spark.sql)
apply:-1, 1700143613 (org.apache.spark.sql.SparkSession$$Lambda$787)
withActive:764, SparkSession (org.apache.spark.sql)
sql:602, SparkSession (org.apache.spark.sql)   -- org.apache.spark.sql.SparkSession#sql  用户编写spark sql语句
main:50, SparkSqlHive$ (org.example.sparksql)
main:-1, SparkSqlHive (org.example.sparksql)

org.apache.spark.sql.Dataset#ofRows

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker)
    : DataFrame = sparkSession.withActive {
    // logicalPlan 解析了的逻辑计划,qe 已经生成了物理执行计划
    val qe = new QueryExecution(sparkSession, logicalPlan, tracker)
    qe.assertAnalyzed()
    new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))
  }

org.apache.spark.sql.Dataset#logicalPlan

  @transient private[sql] val logicalPlan: LogicalPlan = {
    // For various commands (like DDL) and queries with side effects, we force query execution
    // to happen right away to let these side effects take place eagerly.
    val plan = queryExecution.analyzed match {
      case c: Command =>
        // queryExecution 已经生成完了的物理执行计划
        LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))
      case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
        LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect()))
      case _ =>
        queryExecution.analyzed
    }
    if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
      plan.setTagValue(Dataset.DATASET_ID_TAG, id)
    }
    plan
  }

总结如下:

一段sql语句–》物理执行计划-》最底层算子提交 job(物理计划被执行) ->内部提交task-》被调度执行,

其中物理执行计划示例如下:

InsertIntoHiveTable 就是最底层的算子,该算子实现中会提交job

在这里插入图片描述

物理计划被执行触发点如下:

请添加图片描述

整体流程示意图如下:

在这里插入图片描述

物理执行计划中,InsertIntoHiveTable算子的run方法执行示意图如下:

注意

该流程是在driver端执行

,直到提交job

在这里插入图片描述

在这里插入图片描述

结合具体日志流程如下:

(1) 创建相关临时目录,提交task调度执行

在这里插入图片描述

(2)task执行,写入数据到.hive-staging_hive_*/-ext-10000目录(

回答问题1



在这里插入图片描述

(3)将.hive-staging_hive*/-ext-10000目录的文件 写入 表location 目录, 更新元数据(

回答问题1

在这里插入图片描述



1.3.1.3 task如何commit数据(详细回答问题1)

参考:https://www.jianshu.com/p/01ab5f0f22df

总结如下:

  1. 对于spark的InsertIntoHiveTable,结果rdd的每个partition的数据都有相应的task负责数据写入,而每个task都会在目标hive表的location目录下的.hive-staging_hive*/-ext-10000目录中创建相应的临时的staging目录,当前task的所有数据都会先写入到这个staging目录中;

  2. 当单个task写入完成后,会调用FileOutputCommitter.commitTask把task的staging目录下的数据文件都move到.hive-staging_hive*/-ext-10000下面,

    这个过程就是单个task的commit

  3. 当一个spark job的所有task都执行完成并commit成功后,spark会调用FileOutputCommitter.commitJob把

    临时的staging目录都删除掉,并创建_SUCCESS标记文件

  4. 当spark成功将数据都写入到staging_hive*/-ext-10000中 (

    也就是commitJob成功后

    ),spark会调用

    hive的相应API把数据文件都move到目标hive表的location目录

    下,并

    更新hive meta data

    以enable新的hive partition



版权声明:本文为u014034497原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。