SparkSQL实现原理-UDF实现原理分析
创始人
2024-06-02 09:39:35
0

SparkSQL实现原理-UDF实现原理分析

概述

本文介绍Dataset的UDF的实现原理。UDF是User-Defined Functions的简写。用户可以根据自己的需要编写函数,并用于Spark SQL中。但也要注意,Spark不会优化UDF中的代码,若大量使用UDF可能让数据处理的性能受到影响,所以应该优先选择使用spark的api或sql语句来处理数据。

什么是UDF

User-Defined Functions(又名 UDF)是 Spark SQL的一个特性,用户可以根据自己的需要来定义自己的基于列数据的处理逻辑。UDF扩展了Spark SQL的DSL语法,让它变得更加强大。

通过UDF可以让原来的SparkSQL变得更加强大,但由于它的灵活性,也带来了一些优化方面的问题。Spark的maillist中的作者写道:尽可能使用更高级别的标准基于列的函数(使用Dataset的标准API),因为UDF是 Spark SQL的黑匣子(无法推测UDF中的实现逻辑),所以,无法优化它们,甚至不会尝试去优化它们。

所以注意:在实现业务功能时,应该优先选择通过SparkSQL的API或SQL语句来实现。

UDF的使用

自己注册udf函数:

val foo = udf(() => Math.random())
spark.udf.register("random", foo.asNondeterministic())val bar = udf(() => "bar")
spark.udf.register("stringLit", bar.asNonNullable())

查看已经注册的UDF:

scala> spark.catalog.listFunctions.filter("name=='random'").show()
+------+--------+-----------+--------------------+-----------+
|  name|database|description|           className|isTemporary|
+------+--------+-----------+--------------------+-----------+
|random|    null|       null|org.apache.spark....|       true|
+------+--------+-----------+--------------------+-----------+

可以通过以下命令来显示className的全名:

spark.catalog.listFunctions.filter("name=='random'").show(false)

UDF的实现

UDF的注册

在SparkSession中定义了udf函数,该函数用来对UDF进行注册。

def udf: UDFRegistration = sessionState.udfRegistration

UDF的注册实现

在spark中提供了多种UDF的注册函数,可以注册不同类型的UDF。包括:python,多参数的。

一般的UDF注册的实现方式如下:

def register(name: String, udf: UserDefinedFunction): UserDefinedFunction = {def builder(children: Seq[Expression]) = udf.apply(children.map(Column.apply) : _*).exprfunctionRegistry.createOrReplaceTempFunction(name, builder)udf
}

在类FunctionRegistry中,createOrReplaceTempFunction函数的定义如下:

/* Create or replace a temporary function. */
final def createOrReplaceTempFunction(name: String, builder: FunctionBuilder): Unit = {registerFunction(FunctionIdentifier(name),builder)
}

注册UDF时,会定义一个FunctionIdentifier对象,然后把该对象添加到一个functionBuilders的hashMap中。

函数注册:FunctionRegistry

该接口的实现类用来管理用户的UDF。它有一个实现类:SimpleFunctionRegistry。UDF就是在该类的对象中进行管理的。在该类中,定义了一个HashMap用来保存注册的UDF,该变量的定义如下:

  @GuardedBy("this")private val functionBuilders =new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]

可以看到,key是FunctionIdentifier对象,这是UDF的标识类。value是组合:(ExpressionInfo,FunctionBuilder)。

注册函数:registerFunction

有了基本的数据结构支撑,注册一个UDF就变得简单了,就是把UDF的信息插入到functionBuilders这个HashMap中。该函数的实现代码如下:

override def registerFunction(name: FunctionIdentifier,info: ExpressionInfo,builder: FunctionBuilder): Unit = synchronized {// 把UDF名和其信息保存到HashMap中functionBuilders.put(normalizeFuncName(name), (info, builder))
}

注意,在保存UDF时,其函数名会通过normalizeFuncName函数进行处理,其实就是把UDF的名称全部变成小写,所以,UDF是大小写无关的。

其中的ExpressionInfo类,描述了该UDF对应的数据库名,类名等详细信息。

而FunctionIdentifier是函数的标识类,该类主要封装了函数的名称,数据库名等信息,用来匹配相同的UDF。

通过执行该函数,就把UDF注册到一个HashMap中了,此时通过元数据对象就可以找到该UDF的相关信息了。

UDF的注册对象:UserDefinedFunction

通过spark.udf函数会得到一个UserDefinedFunction对象。该对象代表一个创建的UDF。

UDF实际作用

实际使用时,会调用UserDefinedFunction#apply函数来处理数据。

  @scala.annotation.varargsdef apply(exprs: Column*): Column = {// TODO: make sure this class is only instantiated through `SparkUserDefinedFunction.create()`// and `nullableTypes` is always set.if (nullableTypes.isEmpty) {nullableTypes = Some(ScalaReflection.getParameterTypeNullability(f))}// 定义了参数的数据类型if (inputTypes.isDefined) {assert(inputTypes.get.length == nullableTypes.get.length)}Column(ScalaUDF(f,dataType,exprs.map(_.expr),nullableTypes.get,inputTypes.getOrElse(Nil),udfName = _nameOption,nullable = _nullable,udfDeterministic = _deterministic))}

可以看到,最终会通过ScalaUDF来执行udf。

case class ScalaUDF(function: AnyRef,dataType: DataType,children: Seq[Expression],inputsNullSafe: Seq[Boolean],inputTypes: Seq[DataType] = Nil,udfName: Option[String] = None,nullable: Boolean = true,udfDeterministic: Boolean = true)extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression {// 根据udf的参数个数来执行不同的操作private[this] val f = children.size match {case 0 =>val func = function.asInstanceOf[() => Any](input: InternalRow) => {func()}case 1 =>val func = function.asInstanceOf[(Any) => Any]val child0 = children(0)lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)// 遍历输入的每一行数据(input: InternalRow) => {func(converter0(child0.eval(input)))}// ...
}

从实现的代码可以看到,udf会遍历数据集的每一行,然后对每一行调用udf进行处理。而不管参数有多少个,udf的输出只有一列。

SparkUserDefinedFunction

该类是UserDefinedFunction接口的实现类。该类会通过createScalaUDF来创建UDF,通过该函数会得到一个ScalaUDF对象。

聚合操作的UDF的实现类是:UserDefinedAggregator。

总结

  1. UDF让用户可以定自己的数据处理逻辑,提升了SparkSQL处理数据的灵活性,但也增加了优化执行计划的难度。在进行数据处理时,若能够使用标准API处理就尽量使用标准API来处理。

  2. UDF是注册在元数据中的,大小写无关,可以重复注册,新注册的会覆盖老的UDF。

  3. UDF的参数是列,可以是一列或多列。

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...