Flink源码解析(一、source原理)
创始人
2024-02-20 10:38:18
0

文章目录

  • 背景
  • 逻辑原理
    • connector架构
    • sql处理阶段
  • 代码实例
  • 代码debug
  • 参考文献

背景

source/sink 是flink最核心的部分之一,通过对其实现原理的学习,结合源码debug,有助于加深对框架处理过程的理解,以及架构设计上的提升。

逻辑原理

如果我们对自己对接一个数据源,核心的话就是连接器connector,比如关系型数据库就是jdbc。

connector架构

flink官方connector的架构如下
在这里插入图片描述

  • MetaData
    将 sql create source table 转化为实际的 CatalogTable,对应代码RelNode
  • Planning
    创建 RelNode 的过程中使用 SPI 将所有的 source(DynamicTableSourceFactory)\sink(DynamicTableSinkFactory) 工厂动态加载,获取到 connector = kafka,然后从所有 source 工厂中过滤出名称为 kafka 并且 继承自 DynamicTableSourceFactory.class 的工厂类 KafkaDynamicTableFactory,使用 KafkaDynamicTableFactory 创建出 KafkaDynamicSource
  • Runtime
    KafkaDynamicSource 创建出 FlinkKafkaConsumer,负责flink程序实际运行。

sql处理阶段

因为文章采用flink sql作为实例,所以先了解下sql在集群中经历的大致步骤,后续结合源码有助理解。
在这里插入图片描述
从图中可以看出,一段查询 SQL / 使用TableAPI 编写的程序(以下简称 TableAPI 代码)从输入到编译为可执行的 JobGraph 主要经历如下几个阶段

  1. 将 SQL文本 / TableAPI 代码转化为逻辑执行计划(Logical Plan)
  2. Logical Plan 通过优化器优化为物理执行计划(Physical Plan)
  3. 通过代码生成技术生成 Transformations 后进一步编译为可执行的 JobGraph 提交运行

代码实例

版本flink-1.13.1

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class KafkaSourceTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);tEnv.executeSql("CREATE TABLE KafkaSourceTable (\n"+ "  `f0` STRING,\n"+ "  `f1` STRING\n"+ ") WITH (\n"+ "  'connector' = 'kafka',\n"+ "  'topic' = 'topic',\n"+ "  'properties.bootstrap.servers' = 'localhost:9092',\n"+ "  'properties.group.id' = 'testGroup',\n"+ "  'format' = 'json'\n"+ ")");Table t = tEnv.sqlQuery("SELECT * FROM KafkaSourceTable");tEnv.toAppendStream(t, Row.class).print();env.execute();}
}

代码debug

  1. 从tEnv.sqlQuery方法断点进入在这里插入图片描述

  2. 解析sql语法
    后面回根据解析返回的操作表类型创建对应的Table
    在这里插入图片描述

  3. parse主要工作
    获取语法解析器parser,查询计划实现类planner。 将sql语句解析成生成AST抽象语法树SqlNode(实际SqlSelector),之后调用convert转换方法。
    在这里插入图片描述

  4. convert处理
    首先validate验证SqlNode的正确性。
    在这里插入图片描述
    之后根据sql kind为QUERY进入converter.convertSqlQuery方法
    在这里插入图片描述

  5. convertSQLQuery处理
    生成逻辑计划,作用是SqlNode–>RelNode。
    在这里插入图片描述

  6. rel方法
    调用sqlToRelConverter.convertQuery方法。
    在这里插入图片描述
    真正的实现是在 convertQueryRecursive() 方法中完成的。
    在这里插入图片描述
    实际根据kind调用convertSelect方法
    在这里插入图片描述

  7. 调用convertIdentifier
    这中间过程省略一部分,实际调用到convertIdentifier方法。参数BlackBoard是对select进行转换时的一个临时工作空间,可以临时记录下转换过程中需要的信息,比如select依赖的scope、当前的root节点、当前节点是否是top节点等。这里还会创建CatalogSourceTable 类,此类继承自 FlinkPreparingTableBase,负责将 Calcite 的 RelOptTable转化为TableSourceTable
    在这里插入图片描述

  8. toRel
    这里会根据指定的connector,创建对应的tableSource,就和我们connector架构部分关联上了。发现 tableSource 已经是 KafkaDynamicSource。另外可以发现创建table source参数catalogTable,包含了所有 sql create source table 中信息的 catalogTable 变量传入了。
    在这里插入图片描述

  9. createDynamicTableSource
    使用 SPI 将所有的 source(DynamicTableSourceFactory)\sink(DynamicTableSinkFactory) 工厂动态加载,然后根据factoryClass过滤出KafkaDynamicTableFactory
    在这里插入图片描述
    10.createTableSource
    使用 kafka 工厂对象创建出 kafka source。在这里插入图片描述

  10. 获取format
    进入factory具体实现可以看到 KafkaDynamicTableFactory.createDynamicTableSource 中调用 KafkaDynamicTableFactory.createKafkaTableSource 来创建 KafkaDynamicSource。 另外这里还有一个重要点就是获取key value反序列化schema
    在这里插入图片描述
    在这里插入图片描述
    spi机制获取factory后,通过参数中的format=json过滤。
    在这里插入图片描述


参考文献

https://developer.aliyun.com/article/765311
https://cloud.tencent.com/developer/article/1864657

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...