给 Databend 添加 Aggregate 函数 | 函数开发系例二
创始人
2024-05-24 08:25:58
0

在介绍 Scalar Function(给 Databend 添加 Scalar 函数 | 函数开发系例一) 后,我们来看 Aggregate Function。

Aggregate Function 用于对列的值进行操作并返回单个值。常见的 Agg Function 有 sum, count, avg 等。

函数注册

Agg Function 的 register 函数以小写的函数名和 AggregateFunctionDescription 类型为参数,每个被注册的函数都存入 case_insensitive_desc ( HashMap 结构) 中。

而 case_insensitive_combinator_desc 是为了存储一些组合函数,比如与 _if 组合的 count_if, sum_if 这类函数。

pub struct AggregateFunctionFactory {case_insensitive_desc: HashMap,case_insensitive_combinator_desc: Vec<(String, CombinatorDescription)>,
}
impl AggregateFunctionFactory {...
pub fn register(&mut self, name: &str, desc: AggregateFunctionDescription) {let case_insensitive_desc = &mut self.case_insensitive_desc;case_insensitive_desc.insert(name.to_lowercase(), desc);}...
}

每个被注册的函数都要实现 trait AggregateFunction 和 AggregateFunctionFeatures。其中 AggregateFunctionFeatures 和 Scalar 中的 FunctionProperty 比较类似,都是存储函数的一些特质。

pub type AggregateFunctionRef = Arc;
pub type AggregateFunctionCreator =Box, Vec) -> Result + Sync + Send>;
pub struct AggregateFunctionDescription {pub(crate) aggregate_function_creator: AggregateFunctionCreator,pub(crate) features: AggregateFunctionFeatures,
}

主要来看 trait AggregateFunction,这里面是 Agg Function 的构成。

函数构成

可以看到与 Scalar 直接使用一个 Struct 不同,[AggregateFunction](https://github.com/datafuselabs/databend/blob/d5e06af03ba0f99afdd6bdc974bf2f5c1c022db8/src/query/functions/src/aggregates/aggregate_function.rs) 是一个 trait。因为聚合函数是按 block 累加列中的数据,再累加过程中会产生一些中间结果。

因此 Aggregate Function 必须有初始状态,而且聚合过程中生成的结果也要是 mergeable (可合并) 和 serializable (可序列化) 的。

主要函数有:

  • name 表示被注册的函数的名字,比如 avg, sum 等等。

  • return_type 表示被注册的函数返回值的类型,同样的函数返回值可能会由于参数类型的不同而产生变化。比如 sum(int8) 参数为 i8 类型,但是返回返回值可能是 int64。

  • init_state 用来初始化聚合函数状态。

  • state_layout 用来表示 state 在内存中的大小和内存块的排列方式。

  • accumulate 用于SingleStateAggregator。也就是着整个块可以在单个状态下聚合,没有任何 keys。比如 select count(*) from t 此时查询中没有任何分组列的聚合,这时会调度 accumulate 函数。

  • accumulate_keys 则是用于 PartialAggregator。这里需要考虑 key 和 offset ,每个 key 代表一个唯一的内存地址,记为函数参数 place。

  • serialize 将聚合过程中的 state 序列化为二进制。

  • deserialize 从二进制反序列化为 state。

  • merge 用于合并其他 state 到当前 state。

  • merge_result 可以将 Aggregate Function state 合并成单个值。

示例

以 avg 为例

具体实现在 [aggregate_avg.rs](https://github.com/datafuselabs/databend/blob/d5e06af03ba0f99afdd6bdc974bf2f5c1c022db8/src/query/functions/src/aggregates/aggregate_avg.rs) 中。

因为我们需要累加每个值,并除以非 null 总行数。因此 avg function 被定义为 struct AggregateAvgFunction 。其中 T 和 SumT 是实现 [Number](https://github.com/datafuselabs/databend/blob/2aec38605eebb7f0e1717f7f54ec52ae0f2e530b/src/query/expression/src/types/number.rs) 的逻辑类型。

在聚合过程中 avg 会产生的中间状态值是 已经累加的值的总和 以及 已经扫描过的非 null 的行。因此 AggregateAvgState 可以被定义为如下结构。

#[derive(Serialize, Deserialize)]
struct AggregateAvgState {#[serde(bound(deserialize = "T: DeserializeOwned"))]pub value: T,pub count: u64,
}
  • return_type 设置为 Float64Type。比如 value = 3, count = 2, avg = value/count。

  • init_state 初始状态设置 value 为 T 的 default 值,count 为 0。

  • accumulate AggregateAvgState 的 count, value 分别对 block 中非 NULL 的行数和值进行累加。

  • accumulate_keys 通过 place.get::() 获取对应的状态值,并进行更新。

fn accumulate_keys(&self,places: &[StateAddr],offset: usize,columns: &[Column],_input_rows: usize,
) -> Result<()> {let darray = NumberType::::try_downcast_column(&columns[0]).unwrap();darray.iter().zip(places.iter()).for_each(|(c, place)| {let place = place.next(offset);let state = place.get::>();state.add(c.as_(), 1);});Ok(())
}

类似的聚合函数示例也可以参考

[sum](https://github.com/datafuselabs/databend/blob/d5e06af03ba0f99afdd6bdc974bf2f5c1c022db8/src/query/functions/src/aggregates/aggregate_sum.rs),

[count](https://github.com/datafuselabs/databend/blob/d5e06af03ba0f99afdd6bdc974bf2f5c1c022db8/src/query/functions/src/aggregates/aggregate_count.rs)。

函数测试

Unit Test

聚合函数相关单元测试在 [agg.rs](https://github.com/datafuselabs/databend/blob/d5e06af03ba0f99afdd6bdc974bf2f5c1c022db8/src/query/functions/tests/it/aggregates/agg.rs) 中。

Logic Test

Functions 相关的 logic 测试在 [tests/logictest/suites/base/02_function/](https://github.com/datafuselabs/databend/tree/d5e06af03ba0f99afdd6bdc974bf2f5c1c022db8/tests/sqllogictests/suites/query/02_function) 中。

关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

  • Databend 文档:https://databend.rs/

  • Twitter:https://twitter.com/Datafuse_Labs

  • Slack:https://datafusecloud.slack.com/

  • Wechat:Databend

  • GitHub :https://github.com/datafuselabs/databend

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...