首页 星云 工具 资源 星选 资讯 热门工具
:

PDF转图片 完全免费 小红书视频下载 无水印 抖音视频下载 无水印 数字星空

图书《数据资产管理核心技术与应用》核心章节节选-3.1.2. 从Spark 执行计划中获取数据血缘

编程知识
2024年08月02日 14:22

本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著。

从Spark 执行计划中获取数据血缘

因为数据处理任务会涉及到数据的转换和处理,所以从数据任务中解析血缘也是获取数据血缘的渠道之一,Spark 是大数据中数据处理最常用的一个技术组件,既可以做实时任务的处理,也可以做离线任务的处理。Spark在执行每一条SQL语句的时候,都会生成一个执行计划,这一点和很多数据库的做法很类似,都是SQL语句在执行时,先生成执行计划。如下图3-1-10所示,在Spark的官方文档链接https://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html#content中,有明确提到,可以根据EXPLAIN关键字来获取执行计划,这和很多数据库查看执行计划的方式很类似。

图3-1-10

Spark底层生成执行计划以及处理执行计划的过程如下图3-1-11所示。本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著。

 

图3-1-11

从图中可以看到,

1、 执行SQL语句或者Data Frame时,会先生成一个Unresolved Logical Plan,就是没有做过任何处理和分析的逻辑执行计划,仅仅会从SQL语法的角度做一些基础性的校验。

2、 之后通过获取Catalog的数据,对需要执行的SQL语句做表名、列名的进一步分析校验,从而生成一个可以直接运行的逻辑执行计划。

3、 但是Spark底层会有个优化器来生成一个最优的执行操作方式,从而生成一个优化后的最佳逻辑执行计划。

4、 将最终确定下来的逻辑执行计划转换为物理执行计划,转换为最终的代码进行执行。

Spark的执行计划其实就是数据处理的过程计划,会将SQL语句或者DataFrame 做解析,并且结合Catalog一起,生成最终数据转换和处理的代码。所以可以从Spark的执行计划中,获取到数据的转换逻辑,从而解析到数据的血缘。但是spark的执行计划都是在spark底层内部自动处理的,如何获取到每次Spark任务的执行计划的信息呢?其实在Spark底层有一套Listener的架构设计,可以通过Spark Listener 来获取到spark 底层很多执行的数据信息。

在spark的源码中,以Scala的形式提供了一个org.apache.spark.sql.util.QueryExecutionListener  trait (类似Java 语言的接口),来作为Spark SQL等任务执行的监听器。在org.apache.spark.sql.util.QueryExecutionListener  中提供了如下表3-1-2所示的两个方法。

表3-1-2

方法名

描述

def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit

执行成功时,调用的方法,其中包括了执行计划参数,这里的执行计划可以是逻辑计划或者物理计划

def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit

执行失败时,调用的方法,其中同样也包括了执行计划参数,这里的执行计划可以是逻辑计划或者物理计划

因此可以借用QueryExecutionListener  来主动让Spark在执行任务时,将执行计划信息推送到自己的系统或者数据库中,然后再做进一步的解析,如下图3-1-12所示。本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著。

图3-1-12

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
case class PlanExecutionListener(sparkSession: SparkSession) extends QueryExecutionListener with Logging{

  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {
    // 执行成功时,调用解析执行计划的方法
    planParser(qe)
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {

  }

  private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {
    try
      body
    catch {
      case NonFatal(e) =>
        val ctx = qe.sparkSession.sparkContext
        logError(s"Unexpected error occurred during lineage processing for application: ${ctx.appName} #${ctx.applicationId}", e)
    }
  }


  def planParser(qe: QueryExecution): Unit = {
    logInfo("----------- start to get spark  analyzed LogicPlan--------")
      //解析执行计划,并且将执行计划的数据发送到自有的系统或者数据库中
      ......
  }
} 

上面的代码中,实现了QueryExecutionListener 这个trait中的onSuccess和onFailure这两个方法,只有在onSuccess时,才需要获取执行计划的数据,因为只有onSuccess时的血缘才是有效的。

实现好了自定义的QueryExecutionListener后,可以通过sparkSession.listenerManager.register来将自己实现的PlanExecutionListener 注册到Spark会话中,listenerManager是Spark中Listener的管理器。

在获取到执行计划时,需要再结合Catalog一起,来进一步解析血缘的数据,如下图3-1-13所示

图3-1-13

Spark 中常见的执行计划实现类如下表3-1-3所示,获取数据血缘时,就是需要从如下的这些执行计划中解析血缘关系。本文节选自清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著。

表3-1-3

执行计划实现类

描述

org.apache.spark.sql.execution.datasources.LogicalRelation

一般用于解析字段级的关联关系

org.apache.spark.sql.catalyst.catalog.HiveTableRelation

Hive 表关联关系的执行计划,一般用于SQL执行时,存在关联查询的情况会出现该执行计划。

org.apache.spark.sql.hive.execution.InsertIntoHiveTable

一般是在执行insert into 的SQL 语句时才会产生的执行计划,例如insert into xxx_table(colum1,column2) values("4","zhangsan")

org.apache.spark.sql.execution.datasources

.InsertIntoHadoopFsRelationCommand

一般用于执行类似    sparkSession

      .read

      .table("xx_source_table ")

      .limit(10)

      .write

      .mode(SaveMode.Append)

      .insertInto("xx_target_table ")产生的执行计划。

org.apache.spark.sql.hive.execution.

CreateHiveTableAsSelectCommand

一般是在执行create table xxx_table as的SQL 语句时才会产生的执行计划,例如create table xx_target_table as select * from xx_source_table

org.apache.spark.sql.execution.command

.CreateDataSourceTableAsSelectCommand

一般用于执行类似sparkSession

      .read

      .table("xx_source_table")

      .limit(10)

      .write

      .mode(SaveMode.Append)

      .saveAsTable("xx_target_table")产生的执行计划。

org.apache.spark.sql.execution.datasources

.InsertIntoDataSourceCommand

一般用于将SQL查询结果写入到一张表中,比如insert into xxx_target_table select * from xxx_source_table

 

如下是以org.apache.spark.sql.execution.datasources

.InsertIntoHadoopFsRelationCommand 为例的spark 执行计划的数据,如下数据已经将原始的执行计划转换为了json格式的数据,方便做展示。

.................更多内容,请参考清华大学出版社出版的图书《数据资产管理核心技术与应用》,作者为张永清等著

 

 

From:https://www.cnblogs.com/laoqing/p/18338683
本文地址: http://www.shuzixingkong.net/article/709
0评论
提交 加载更多评论
其他文章 stable diffusion 实践与测试
stable diffusion 实践与测试 放大 原图高清放大 原始图片 当不满意图片质量的时候 使用stable diffusion进行二次处理 选择适合图片风格的模型,再次根据图片写出提示词 输入原图像1024尺寸之后调整重绘幅度 采样器automatic在这里会选择karras 原图异变放大
stable diffusion 实践与测试 stable diffusion 实践与测试 stable diffusion 实践与测试
无缝融入,即刻智能[1]:MaxKB知识库问答系统,零编码嵌入第三方业务系统,定制专属智能方案,用户满意度飙升
无缝融入,即刻智能[1]:MaxKB知识库问答系统,零编码嵌入第三方业务系统,定制专属智能方案,用户满意度飙升
无缝融入,即刻智能[1]:MaxKB知识库问答系统,零编码嵌入第三方业务系统,定制专属智能方案,用户满意度飙升 无缝融入,即刻智能[1]:MaxKB知识库问答系统,零编码嵌入第三方业务系统,定制专属智能方案,用户满意度飙升 无缝融入,即刻智能[1]:MaxKB知识库问答系统,零编码嵌入第三方业务系统,定制专属智能方案,用户满意度飙升
Apache COC闪电演讲总结【OSGraph】
与一般的演讲不同,lightning talk主打一个字就是“快”,如何在5min内给听众表达清楚你的观点,就很具挑战性了,整个一开源版的“电梯一分钟”。
Apache COC闪电演讲总结【OSGraph】 Apache COC闪电演讲总结【OSGraph】 Apache COC闪电演讲总结【OSGraph】
使用Cython调用CUDA Kernel函数
从Python接口调用GPU进行加速的方案有很多,包括Cupy和PyCuda以及之前介绍过的Numba,还可以使用MindSpore、PyTorch和Jax等成熟的深度学习框架,这里介绍了一种直接写CUDA Kernel函数的方案。为了能够做到CUDA-C和Python编程的分离,这里引入了Cyth
Label Studio数据标注--通过源码安装
Label Studio是Heartex公司开发的一款在线数据标注工具,本文就介绍如何从github上clone源码安装Label Studio。
Kotlin 字符串教程:深入理解与使用技巧
Kotlin中的字符串用于存储文本,定义时使用双引号包围字符序列,如`var greeting = "Hello"`。Kotlin能自动推断变量类型,但在未初始化时需显式指定类型,如`var name: String`。可通过索引访问字符串元素,如`txt[0]`获取首字符。字符
Zabbix搭建
目录zabbix搭建1. 前置环境准备1.1 搭建LNMP2. zabbix准备工作2.1 安装php模块以及编译所需工具2.2 修改php配置2.3 编译安装zabbix2.3.1 下载tar包2.3.2 解压2.3.3 创建用户/组2.3.4 开始安装2.3.5 配置数据库3. 配置zabbix
Zabbix搭建 Zabbix搭建 Zabbix搭建
argparse学习笔记
argparse是 Python 的一个内置模块,用于编写用户友好的命令行接口。使用 argparse,你可以很容易地为 Python 脚本添加参数解析功能,使得脚本可以接受命令行选项和参数。学起来也比较简单,接下来我会详细讲解。 在argparse里面,有位置参数和可选参数。位置参数是必不可少的,