
一、引言Flink SQL 作为声明式 API其从用户编写 SQL 语句到最终物理执行需要经历一套独立的编译流水线SQL 解析 → 逻辑计划 → 优化计划 → 物理计划 → Transformation → StreamGraph → JobGraph → ExecutionGraph → 物理执行图。相比 DataStream API 直接构建 Transformation 的方式Flink SQL 在前端多了一整套基于 Apache Calcite 的查询优化体系但在 StreamGraph 之后两者殊途同归、共享同一套运行时本文将完整拆解 Flink SQL 的编译执行链路。二、全局视角Flink SQL 的完整编译链路三、第一阶段SQL 解析ParsingFlink SQL 使用Apache Calcite的 Parser 模块将 SQL 文本解析为抽象语法树AST表示为SqlNode树结构。// 简化的内部调用链 TableEnvironment.executeSql(INSERT INTO ...) → Parser.parse(sqlString) → Calcite SqlParser.parseStmt() → 生成 SqlNode 树Flink 对 Calcite 的标准 SQL 语法进行了扩展支持 Flink 特有的语法如CREATE TABLE ... WITH (...) — connector 属性TUMBLE / HOP / SESSION — 窗口 TVFMATCH_RECOGNIZE — CEP 模式匹配STATEMENT SET — 多 Sink 语句核心数据结构SqlNode 树示例SELECT user_id, COUNT(*) FROM orders GROUP BY user_id: SqlSelect ├── selectList: [SqlIdentifier(user_id), SqlBasicCall(COUNT, *)] ├── from: SqlIdentifier(orders) ├── where: null ├── groupBy: SqlNodeList[SqlIdentifier(user_id)] └── ...该阶段的作用语法正确性检查拼写错误、语法结构错误等在此阶段报错将文本 SQL 转化为结构化的内存表示为后续语义分析做准备四、第二阶段语义校验ValidationValidator 利用Catalog元数据中心对 SqlNode 进行语义校验┌─────────────────────────────────────────────────────┐ │ Validator 校验内容 │ ├─────────────────────────────────────────────────────┤ │ 1. 表是否存在查询 Catalog │ │ 2. 列名是否存在、类型是否匹配 │ │ 3. 函数是否已注册、参数类型是否正确 │ │ 4. 窗口定义是否合法 │ │ 5. INSERT 目标表的 Schema 与 SELECT 是否兼容 │ │ 6. 数据类型的隐式转换和强制转换合法性 │ └─────────────────────────────────────────────────────┘Catalog 体系┌─────────────────────────────────────────┐ │ CatalogManager │ │ ┌─────────────────────────────────┐ │ │ │ Catalog (如 HiveCatalog) │ │ │ │ ┌───────────────────────────┐ │ │ │ │ │ Database │ │ │ │ │ │ ┌─────────────────────┐ │ │ │ │ │ │ │ Table (Schema 定义) │ │ │ │ │ │ │ │ - 列名、类型 │ │ │ │ │ │ │ │ - 主键、水位线 │ │ │ │ │ │ │ │ - Connector 属性 │ │ │ │ │ │ │ └─────────────────────┘ │ │ │ │ │ └───────────────────────────┘ │ │ │ └─────────────────────────────────┘ │ └─────────────────────────────────────────┘该阶段的作用注册 UDFCREATE FUNCTION使 Validator 能识别自定义函数配置 Catalog 元数据CREATE TABLE DDL 或 HiveCatalog 集成设置类型系统相关的配置如 table.exec.legacy-cast-behaviour五、第三阶段逻辑计划生成Logical Planning通过 Calcite 的SqlToRelConverter将校验后的 SqlNode 转换为关系代数表达式树RelNode。SQL: SELECT user_id, COUNT(*) AS cnt FROM orders GROUP BY user_id 逻辑计划RelNode 树: LogicalProject(user_id[$0], cnt[$1]) └── LogicalAggregate(group[{0}], cnt[COUNT()]) └── LogicalProject(user_id[$0]) └── LogicalTableScan(table[[default_catalog, default_database, orders]])核心概念概念说明RelNode关系代数节点如 Project、Filter、Aggregate、Join、Sort 等RexNode行表达式节点表示列引用、常量、函数调用等RelTraitSet节点的物理特性集如排序顺序Collation、数据分布DistributionRelDataType行类型定义包含列名和数据类型纯逻辑表达不包含任何执行策略与具体引擎无关标准 Calcite RelNode保留了完整的关系代数语义六、第四阶段查询优化Optimization这是 Flink SQL 编译链路中最核心的阶段Flink 使用两套优化器协同工作关键优化规则示例谓词下推Filter Push DownLocal-Global 聚合拆分在流模式下优化器还需要处理考量点说明Changelog 语义区分 Insert-only、Upsert、Retract 流选择合适的物理算子更新类型推导判断每个节点产出的是 I/-D/U/-U 中的哪些决定下游是否需要处理撤回状态 TTL聚合、Join 等有状态算子的状态清理策略影响结果正确性Mini-batch 优化攒批触发以减少状态访问和下游更新频率该阶段的作用-- 查看执行计划用于验证优化效果 EXPLAIN SELECT ...; -- Hint 干预 Join 策略 SELECT /* BROADCAST(small_table) */ * FROM large_table JOIN small_table ON ...; -- Hint 干预状态 TTL SELECT /* STATE_TTL(orders 1d, users 7d) */ * FROM orders JOIN users ON ...; # flink-conf.yaml 或 SET 命令 # Mini-batch 优化减少状态访问频次 table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5s table.exec.mini-batch.size: 1000 # Local-Global 聚合 table.optimizer.agg-phase-strategy: TWO_PHASE # 数据倾斜优化 table.optimizer.distinct-agg.split.enabled: true # 状态 TTL table.exec.state.ttl: 36h # Join 重排 table.optimizer.join-reorder-enabled: true最佳实践善用 EXPLAIN 查看执行计划在提交作业前通过 EXPLAIN 确认优化是否生效谓词是否下推、Join 策略是否合理。启用 Mini-batch对于高 QPS 的聚合/Join 场景Mini-batch 能显著降低状态访问频率和下游更新压力。合理设置状态 TTL流式 Join 和聚合会无限积累状态务必设置 TTL 避免状态膨胀导致 OOM。关注 Changelog 模式如果 Sink 不支持 Retract/Upsert需要确保上游查询只产出 Insert-only 流否则运行时报错。七、第五阶段物理计划生成Physical Planning优化后的逻辑 RelNode 被转换为 Flink 特有的物理 RelNodeFlinkPhysicalRel再进一步转为ExecNode。优化后逻辑计划: 物理计划: FlinkLogicalAggregate StreamExecGroupAggregate └── FlinkLogicalCalc └── StreamExecExchange(hash[user_id]) └── FlinkLogicalTableSourceScan └── StreamExecCalc └── StreamExecTableSourceScanExecNode是 Flink SQL 物理层的核心抽象。每个 ExecNode 负责将自身翻译为一组Transformation这里是 SQL 编译链路与 DataStream 运行时的汇合点。// ExecNode 接口核心方法 public interface ExecNodeT { /** * 将当前节点翻译为 Transformation * 这是 SQL 层到 DataStream 运行时的桥梁 */ TransformationT translateToPlan(Planner planner); }每个物理算子内部调用 DataStream 层的基础设施来构建 TransformationStreamExecGroupAggregate.translateToPlan() → 构建 KeyedProcessOperator(GroupAggFunction) → 包装为 OneInputTransformation → 设置并行度、资源、uid 等八、从 Transformation 到执行与 DataStream 的汇合从 Transformation 层开始两条路径完全共享同一套运行时基础设施同样的 StreamGraph 构建逻辑同样的算子链化优化同样的调度器和 Failover 机制同样的状态后端和 Checkpoint 协议同样的网络栈和反压机制九、一个完整 SQL 作业的图演变示例-- DDL CREATE TABLE orders ( order_id STRING, user_id STRING, amount DECIMAL(10,2), order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL 5 SECOND ) WITH (connector kafka, ...); CREATE TABLE user_order_stats ( user_id STRING, window_start TIMESTAMP(3), order_cnt BIGINT, total_amount DECIMAL(10,2), PRIMARY KEY (user_id, window_start) NOT ENFORCED ) WITH (connector jdbc, ...); -- DML INSERT INTO user_order_stats SELECT user_id, window_start, COUNT(*) AS order_cnt, SUM(amount) AS total_amount FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL 1 MINUTE)) GROUP BY user_id, window_start;编译链路演变1. Parse → SqlNode: SqlInsert └── SqlSelect(selectList[user_id, window_start, COUNT(*), SUM(amount)], fromTUMBLE(...), groupBy[user_id, window_start]) 2. Validate → 校验 orders 表存在、列类型匹配、TUMBLE 参数合法 3. Logical Plan → RelNode: LogicalSink(user_order_stats) └── LogicalProject(user_id, window_start, order_cnt, total_amount) └── LogicalWindowAggregate(group[user_id], window[TUMBLE(1min)], aggs[COUNT(*), SUM(amount)]) └── LogicalWatermarkAssigner(order_time - 5s) └── LogicalTableScan(orders) 4. Optimize → 优化后逻辑计划: - 投影下推只从 Kafka 读取 user_id, amount, order_time 三列 - 窗口聚合识别识别为 Window TVF Aggregate可使用高效窗口算子 5. Physical Plan → ExecNode: StreamExecSink(user_order_stats) └── StreamExecWindowAggregate(group[user_id], window[TUMBLE(1min)], select[user_id, window_start, COUNT(*), SUM(amount)]) └── StreamExecExchange(distribution[hash[user_id]]) └── StreamExecWindowTableFunction(TUMBLE, time_col[order_time], size[1min]) └── StreamExecWatermarkAssigner(order_time - 5s) └── StreamExecTableSourceScan(orders, fields[user_id, amount, order_time]) 6. Transformation (5 个): [KafkaSource] → [WatermarkAssigner] → [WindowFunction] → [Exchange] → [WindowAgg] → [JdbcSink] 7. StreamGraph (6 个 StreamNode): [Source(p3)] --Forward-- [Watermark(p3)] --Forward-- [WindowTVF(p3)] --Hash(user_id)-- [WindowAgg(p6)] --Forward-- [Sink(p6)] 8. JobGraph (算子链化后, 3 个 JobVertex): [Source→Watermark→WindowTVF Chain(p3)] --Hash-- [WindowAgg→Sink Chain(p6)] ※ Source/Watermark/WindowTVF 链化相同并行度 Forward ※ WindowAgg/Sink 链化相同并行度 Forward ※ 中间断开Hash 分区 并行度不同(3→6) 9. ExecutionGraph: [3 个 Source Chain EV] --Hash(ALL_TO_ALL)-- [6 个 WindowAggSink EV] 共 9 个 ExecutionVertex 10. 物理执行图: 3 个 Task(Source Chain) 6 个 Task(WindowAggSink) 9 个 Task 最少需要 6 个 Slot