Spark Sql 转换成Task执行 和 InsertIntoHiveTable写入hive表数据 源码分析
创始人
2024-05-25 12:03:11
0

1.3.1 InsertIntoHiveTable类源码解析

1.3.1.1 背景

请添加图片描述

读取数据,经过处理后,最终写入 hive表,这里研究下写入原理。抛出如下几个问题?

1、task处理完数据后,如何将数据放到表的location目录下?

2、这类写入表的task,是如何从spark sql 逻辑计划/物理计划 转化成 task启动的?

1.3.1.2 spark sql 逻辑计划/物理计划 如何转化成 task(回答问题2)

driver端 调试日志如下(定位InsertIntoHiveTable物理执行算子的run方法):

run:84, InsertIntoHiveTable (org.apache.spark.sql.hive.execution)
sideEffectResult$lzycompute:108, DataWritingCommandExec (org.apache.spark.sql.execution.command)
sideEffectResult:106, DataWritingCommandExec (org.apache.spark.sql.execution.command)
executeCollect:120, DataWritingCommandExec (org.apache.spark.sql.execution.command)  -- 物理执行计划触发执行
$anonfun$logicalPlan$1:229, Dataset (org.apache.spark.sql)    -----org.apache.spark.sql.Dataset#logicalPlan 
apply:-1, 76482793 (org.apache.spark.sql.Dataset$$Lambda$1656)
$anonfun$withAction$1:3618, Dataset (org.apache.spark.sql)
apply:-1, 277117675 (org.apache.spark.sql.Dataset$$Lambda$1657)
$anonfun$withNewExecutionId$5:100, SQLExecution$ (org.apache.spark.sql.execution)
apply:-1, 1668179857 (org.apache.spark.sql.execution.SQLExecution$$$Lambda$1665)
withSQLConfPropagated:160, SQLExecution$ (org.apache.spark.sql.execution)
$anonfun$withNewExecutionId$1:87, SQLExecution$ (org.apache.spark.sql.execution)
apply:-1, 216687255 (org.apache.spark.sql.execution.SQLExecution$$$Lambda$1658)
withActive:764, SparkSession (org.apache.spark.sql)
withNewExecutionId:64, SQLExecution$ (org.apache.spark.sql.execution)
withAction:3616, Dataset (org.apache.spark.sql)
:229, Dataset (org.apache.spark.sql)
$anonfun$ofRows$2:100, Dataset$ (org.apache.spark.sql)  --- org.apache.spark.sql.Dataset#ofRows
apply:-1, 2116006444 (org.apache.spark.sql.Dataset$$$Lambda$925)
withActive:764, SparkSession (org.apache.spark.sql)
ofRows:97, Dataset$ (org.apache.spark.sql)
$anonfun$sql$1:607, SparkSession (org.apache.spark.sql)
apply:-1, 1700143613 (org.apache.spark.sql.SparkSession$$Lambda$787)
withActive:764, SparkSession (org.apache.spark.sql)
sql:602, SparkSession (org.apache.spark.sql)   -- org.apache.spark.sql.SparkSession#sql  用户编写spark sql语句
main:50, SparkSqlHive$ (org.example.sparksql)
main:-1, SparkSqlHive (org.example.sparksql)

org.apache.spark.sql.Dataset#ofRows

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan, tracker: QueryPlanningTracker): DataFrame = sparkSession.withActive {// logicalPlan 解析了的逻辑计划,qe 已经生成了物理执行计划val qe = new QueryExecution(sparkSession, logicalPlan, tracker)qe.assertAnalyzed()new Dataset[Row](qe, RowEncoder(qe.analyzed.schema))}

org.apache.spark.sql.Dataset#logicalPlan

  @transient private[sql] val logicalPlan: LogicalPlan = {// For various commands (like DDL) and queries with side effects, we force query execution// to happen right away to let these side effects take place eagerly.val plan = queryExecution.analyzed match {case c: Command =>// queryExecution 已经生成完了的物理执行计划LocalRelation(c.output, withAction("command", queryExecution)(_.executeCollect()))case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>LocalRelation(u.output, withAction("command", queryExecution)(_.executeCollect()))case _ =>queryExecution.analyzed}if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {plan.setTagValue(Dataset.DATASET_ID_TAG, id)}plan}

总结如下:

一段sql语句–》物理执行计划-》最底层算子提交 job(物理计划被执行) ->内部提交task-》被调度执行,

其中物理执行计划示例如下:

InsertIntoHiveTable 就是最底层的算子,该算子实现中会提交job

在这里插入图片描述

物理计划被执行触发点如下:

请添加图片描述

整体流程示意图如下:

在这里插入图片描述

物理执行计划中,InsertIntoHiveTable算子的run方法执行示意图如下:

注意该流程是在driver端执行,直到提交job

在这里插入图片描述
在这里插入图片描述

结合具体日志流程如下:

(1) 创建相关临时目录,提交task调度执行

在这里插入图片描述

(2)task执行,写入数据到.hive-staging_hive_*/-ext-10000目录(回答问题1
在这里插入图片描述

(3)将.hive-staging_hive*/-ext-10000目录的文件 写入 表location 目录, 更新元数据(回答问题1

在这里插入图片描述

1.3.1.3 task如何commit数据(详细回答问题1)

参考:https://www.jianshu.com/p/01ab5f0f22df

总结如下:

  1. 对于spark的InsertIntoHiveTable,结果rdd的每个partition的数据都有相应的task负责数据写入,而每个task都会在目标hive表的location目录下的.hive-staging_hive*/-ext-10000目录中创建相应的临时的staging目录,当前task的所有数据都会先写入到这个staging目录中;

  2. 当单个task写入完成后,会调用FileOutputCommitter.commitTask把task的staging目录下的数据文件都move到.hive-staging_hive*/-ext-10000下面,这个过程就是单个task的commit

  3. 当一个spark job的所有task都执行完成并commit成功后,spark会调用FileOutputCommitter.commitJob把临时的staging目录都删除掉,并创建_SUCCESS标记文件

  4. 当spark成功将数据都写入到staging_hive*/-ext-10000中 (也就是commitJob成功后),spark会调用hive的相应API把数据文件都move到目标hive表的location目录下,并更新hive meta data以enable新的hive partition

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...