本文介绍Dataset的UDF的实现原理。UDF是User-Defined Functions的简写。用户可以根据自己的需要编写函数,并用于Spark SQL中。但也要注意,Spark不会优化UDF中的代码,若大量使用UDF可能让数据处理的性能受到影响,所以应该优先选择使用spark的api或sql语句来处理数据。
User-Defined Functions(又名 UDF)是 Spark SQL的一个特性,用户可以根据自己的需要来定义自己的基于列数据的处理逻辑。UDF扩展了Spark SQL的DSL语法,让它变得更加强大。
通过UDF可以让原来的SparkSQL变得更加强大,但由于它的灵活性,也带来了一些优化方面的问题。Spark的maillist中的作者写道:尽可能使用更高级别的标准基于列的函数(使用Dataset的标准API),因为UDF是 Spark SQL的黑匣子(无法推测UDF中的实现逻辑),所以,无法优化它们,甚至不会尝试去优化它们。
所以注意:在实现业务功能时,应该优先选择通过SparkSQL的API或SQL语句来实现。
自己注册udf函数:
val foo = udf(() => Math.random())
spark.udf.register("random", foo.asNondeterministic())val bar = udf(() => "bar")
spark.udf.register("stringLit", bar.asNonNullable())
查看已经注册的UDF:
scala> spark.catalog.listFunctions.filter("name=='random'").show()
+------+--------+-----------+--------------------+-----------+
| name|database|description| className|isTemporary|
+------+--------+-----------+--------------------+-----------+
|random| null| null|org.apache.spark....| true|
+------+--------+-----------+--------------------+-----------+
可以通过以下命令来显示className的全名:
spark.catalog.listFunctions.filter("name=='random'").show(false)
在SparkSession中定义了udf函数,该函数用来对UDF进行注册。
def udf: UDFRegistration = sessionState.udfRegistration
UDF的注册实现
在spark中提供了多种UDF的注册函数,可以注册不同类型的UDF。包括:python,多参数的。
一般的UDF注册的实现方式如下:
def register(name: String, udf: UserDefinedFunction): UserDefinedFunction = {def builder(children: Seq[Expression]) = udf.apply(children.map(Column.apply) : _*).exprfunctionRegistry.createOrReplaceTempFunction(name, builder)udf
}
在类FunctionRegistry中,createOrReplaceTempFunction函数的定义如下:
/* Create or replace a temporary function. */
final def createOrReplaceTempFunction(name: String, builder: FunctionBuilder): Unit = {registerFunction(FunctionIdentifier(name),builder)
}
注册UDF时,会定义一个FunctionIdentifier对象,然后把该对象添加到一个functionBuilders的hashMap中。
该接口的实现类用来管理用户的UDF。它有一个实现类:SimpleFunctionRegistry。UDF就是在该类的对象中进行管理的。在该类中,定义了一个HashMap用来保存注册的UDF,该变量的定义如下:
@GuardedBy("this")private val functionBuilders =new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]
可以看到,key是FunctionIdentifier对象,这是UDF的标识类。value是组合:(ExpressionInfo,FunctionBuilder)。
有了基本的数据结构支撑,注册一个UDF就变得简单了,就是把UDF的信息插入到functionBuilders这个HashMap中。该函数的实现代码如下:
override def registerFunction(name: FunctionIdentifier,info: ExpressionInfo,builder: FunctionBuilder): Unit = synchronized {// 把UDF名和其信息保存到HashMap中functionBuilders.put(normalizeFuncName(name), (info, builder))
}
注意,在保存UDF时,其函数名会通过normalizeFuncName函数进行处理,其实就是把UDF的名称全部变成小写,所以,UDF是大小写无关的。
其中的ExpressionInfo类,描述了该UDF对应的数据库名,类名等详细信息。
而FunctionIdentifier是函数的标识类,该类主要封装了函数的名称,数据库名等信息,用来匹配相同的UDF。
通过执行该函数,就把UDF注册到一个HashMap中了,此时通过元数据对象就可以找到该UDF的相关信息了。
通过spark.udf函数会得到一个UserDefinedFunction对象。该对象代表一个创建的UDF。
实际使用时,会调用UserDefinedFunction#apply函数来处理数据。
@scala.annotation.varargsdef apply(exprs: Column*): Column = {// TODO: make sure this class is only instantiated through `SparkUserDefinedFunction.create()`// and `nullableTypes` is always set.if (nullableTypes.isEmpty) {nullableTypes = Some(ScalaReflection.getParameterTypeNullability(f))}// 定义了参数的数据类型if (inputTypes.isDefined) {assert(inputTypes.get.length == nullableTypes.get.length)}Column(ScalaUDF(f,dataType,exprs.map(_.expr),nullableTypes.get,inputTypes.getOrElse(Nil),udfName = _nameOption,nullable = _nullable,udfDeterministic = _deterministic))}
可以看到,最终会通过ScalaUDF来执行udf。
case class ScalaUDF(function: AnyRef,dataType: DataType,children: Seq[Expression],inputsNullSafe: Seq[Boolean],inputTypes: Seq[DataType] = Nil,udfName: Option[String] = None,nullable: Boolean = true,udfDeterministic: Boolean = true)extends Expression with ImplicitCastInputTypes with NonSQLExpression with UserDefinedExpression {// 根据udf的参数个数来执行不同的操作private[this] val f = children.size match {case 0 =>val func = function.asInstanceOf[() => Any](input: InternalRow) => {func()}case 1 =>val func = function.asInstanceOf[(Any) => Any]val child0 = children(0)lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)// 遍历输入的每一行数据(input: InternalRow) => {func(converter0(child0.eval(input)))}// ...
}
从实现的代码可以看到,udf会遍历数据集的每一行,然后对每一行调用udf进行处理。而不管参数有多少个,udf的输出只有一列。
该类是UserDefinedFunction接口的实现类。该类会通过createScalaUDF来创建UDF,通过该函数会得到一个ScalaUDF对象。
聚合操作的UDF的实现类是:UserDefinedAggregator。
UDF让用户可以定自己的数据处理逻辑,提升了SparkSQL处理数据的灵活性,但也增加了优化执行计划的难度。在进行数据处理时,若能够使用标准API处理就尽量使用标准API来处理。
UDF是注册在元数据中的,大小写无关,可以重复注册,新注册的会覆盖老的UDF。
UDF的参数是列,可以是一列或多列。