Flink SQL 知其所以然:Explain、Show、Load、Set 子句

数据库2025-11-05 03:00:517

EXPLAIN 子句

大家好,我是老羊,今天我们来学习 Flink SQL 中的的 Explain、Show、Load、Set 共 4 个子句。

应用场景:EXPLAIN 子句其实就是用于查看当前这个 sql 查询的逻辑计划以及优化的执行计划。SQL 语法标准: 复制EXPLAIN PLAN FOR <query_statement_or_insert_statement>1. 实际案例: 复制public class Explain_Test { public static void main(String[] args) throws Exception { FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args); flinkEnv.env().setParallelism(1); String sql = "CREATE TABLE source_table (\n" + " user_id BIGINT COMMENT 用户 id,\n" + " name STRING COMMENT 用户姓名,\n" + " server_timestamp BIGINT COMMENT 用户访问时间戳,\n" + " proctime AS PROCTIME()\n" + ") WITH (\n" + " connector = datagen,\n" + " rows-per-second = 1,\n" + " fields.name.length = 1,\n" + " fields.user_id.min = 1,\n" + " fields.user_id.max = 10,\n" + " fields.server_timestamp.min = 1,\n" + " fields.server_timestamp.max = 100000\n" + ");\n" + "\n" + "CREATE TABLE sink_table (\n" + " user_id BIGINT,\n" + " name STRING,\n" + " server_timestamp BIGINT\n" + ") WITH (\n" + " connector = print\n" + ");\n" + "\n" + "EXPLAIN PLAN FOR\n" + "INSERT INTO sink_table\n" + "select user_id,\n" + " name,\n" + " server_timestamp\n" + "from (\n" + " SELECT\n" + " user_id,\n" + " name,\n" + " server_timestamp,\n" + " row_number() over(partition by user_id order by proctime) as rn\n" + " FROM source_table\n" + ")\n" + "where rn = 1"; /** * 算子 {@link org.apache.flink.streaming.api.operators.KeyedProcessOperator} * -- {@link org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction} */ for (String innerSql : sql.split(";")) { TableResult tableResult = flinkEnv.streamTEnv().executeSql(innerSql); tableResult.print(); } }}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.

上述代码执行结果如下:

复制1

. 抽象语法树

== Abstract Syntax Tree ==LogicalSink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2]) +- LogicalFilter(condition=[=($3, 1)]) +- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)]) +- LogicalTableScan(table=[[default_catalog, default_database, source_table]])2

. 优化后的物理计划

== Optimized Physical Plan ==Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])+- Calc(select=[user_id, name, server_timestamp]) +- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME]) +- Exchange(distribution=[hash[user_id]]) +- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3]) +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])3

. 优化后的执行计划

== Optimized Execution Plan ==Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])+- Calc(select=[user_id, name, server_timestamp]) +- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME]) +- Exchange(distribution=[hash[user_id]]) +- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3]) +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.

USE 子句

应用场景:如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,USE 子句通常被用于切换库,那么在 Flink SQL 体系中,它的源码库作用也是和 MySQL 中 USE 子句的功能基本一致,用于切换 Catalog,DataBase,使用 Module。SQL 语法标准:切换 Catalog: 复制USE CATALOG catalog_name1. 使用 Module: 复制USE MODULES module_name1[, module_name2, ...]1. 切换 Database: 复制USE db名称1. 实际案例: 复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// create

a catalog

tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");tEnv.executeSql("SHOW CATALOGS").print();// +-----------------+// | catalog name |// +-----------------+// | default_catalog |// | cat1 |// +-----------------+//

change default catalog

tEnv.executeSql("USE CATALOG cat1");tEnv.executeSql("SHOW DATABASES").print();//

databases are empty

// +---------------+// | database name |// +---------------+// +---------------+// create

a database

tEnv.executeSql("CREATE DATABASE db1 WITH (...)");tEnv.executeSql("SHOW DATABASES").print();// +---------------+// | database name |// +---------------+// | db1 |// +---------------+//

change default database

tEnv.executeSql("USE db1");// change module resolution order and

enabled status

tEnv.executeSql("USE MODULES hive");tEnv.executeSql("SHOW FULL MODULES").print();// +-------------+-------+// | module name | used |// +-------------+-------+// | hive | true |// | core | false |// +-------------+-------+1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.

SHOW 子句

应用场景:如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,SHOW 子句常常用于查询库、表、函数等,在 Flink SQL 体系中也类似。Flink SQL 支持 SHOW 以下内容。SQL 语法标准:

复制SHOW CATALOGS:展示所有 Catalog

SHOW CURRENT CATALOG:展示当前的 Catalog

SHOW DATABASES:展示当前 Catalog 下所有 Database

SHOW CURRENT DATABASE:展示当前的 Database

SHOW TABLES:展示当前 Database 下所有表

SHOW VIEWS:展示所有视图

SHOW FUNCTIONS:展示所有的函数

SHOW MODULES:展示所有的 Module(Module 是用于 UDF 扩展)1.2.3.4.5.6.7.8. 实际案例: 复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//

show catalogs

tEnv.executeSql("SHOW CATALOGS").print();// +-----------------+// | catalog name |// +-----------------+// | default_catalog |// +-----------------+//

show current catalog

tEnv.executeSql("SHOW CURRENT CATALOG").print();// +----------------------+// | current catalog name |// +----------------------+// | default_catalog |// +----------------------+//

show databases

tEnv.executeSql("SHOW DATABASES").print();// +------------------+// | database name |// +------------------+// | default_database |// +------------------+//

show current database

tEnv.executeSql("SHOW CURRENT DATABASE").print();// +-----------------------+// | current database name |// +-----------------------+// | default_database |// +-----------------------+// create a tabletEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");//

show tables

tEnv.executeSql("SHOW TABLES").print();// +------------+// | table name |// +------------+// | my_table |// +------------+// create

a view

tEnv.executeSql("CREATE VIEW my_view AS ...");//

show views

tEnv.executeSql("SHOW VIEWS").print();// +-----------+// | view name |// +-----------+// | my_view |// +-----------+//

show functions

tEnv.executeSql("SHOW FUNCTIONS").print();// +---------------+// | function name |// +---------------+// | mod |// | sha256 |// | ... |// +---------------+// create

a user defined function

tEnv.executeSql("CREATE FUNCTION f1 AS ...");//

show user defined functions

tEnv.executeSql("SHOW USER FUNCTIONS").print();// +---------------+// | function name |// +---------------+// | f1 |// | ... |// +---------------+//

show modules

tEnv.executeSql("SHOW MODULES").print();// +-------------+// | module name |// +-------------+// | core |// +-------------+//

show full modules

tEnv.executeSql("SHOW FULL MODULES").print();// +-------------+-------+// | module name | used |// +-------------+-------+// | core | true |// | hive | false |// +-------------+-------+1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.

LOAD、亿华云UNLOAD 子句

应用场景:我们可以使用 LOAD 子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,UNLOAD 子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。SQL 语法标准: 复制-- 加载LOAD MODULE module_name [WITH (key1 = val1, key2 = val2, ...)]-- 卸载UNLOAD MODULE module_name1.2.3.4. 实际案例:LOAD 案例: 复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//

加载 Flink SQL 体系内置的 Hive module

tEnv.executeSql("LOAD MODULE hive WITH (hive-version = 3.1.2)");tEnv.executeSql("SHOW MODULES").print();// +-------------+// | module name |// +-------------+// | core |// | hive |// +-------------+1.2.3.4.5.6.7.8.9.10.11. UNLOAD 案例: 复制StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//

卸载唯一的一个 CoreModule

tEnv.executeSql("UNLOAD MODULE core");tEnv.executeSql("SHOW MODULES").print();// 结果啥 Moudle 都没有了1.2.3.4.5.6.

SET、RESET 子句

应用场景:SET 子句可以用于修改一些 Flink SQL 的环境配置,RESET 子句是可以将所有的环境配置恢复成默认配置,但只能在 SQL CLI 中进行使用,主要是为了让用户更纯粹的使用 SQL 而不必使用其他方式或者切换系统环境。SQL 语法标准: 复制SET (key = value)?RESET (key)?1.2. 实际案例:

启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:

复制Flink SQL> SET table.planner = blink;[INFO] Session property has been set

.

Flink SQL> SET;table.planner=blink;Flink SQL> RESET table.planner;[INFO]

Session property has been reset.

Flink SQL> RESET;[INFO] All session properties have been set to their default values.1.2.3.4.5.6.7.8.云服务器
本文地址:http://www.bhae.cn/html/931e7198997.html
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

全站热门

电脑常见错误提示及解决方法(解决电脑错误提示的技巧和方法)

怎样的域名别人容易搜寻?选域名的技巧要学会哪些?

域名投资为什么做好这些准备是必要的?投资域名注意什么?

新手购买便宜域名有何看法?有哪些建议?

解决电脑启动Windows错误的方法(解决Windows启动故障的有效策略)

小白注册后域名怎么办?域名会有什么操作?

注册域名要做到哪几点?小白域名注册要注意什么?

选网站域名要考虑SEO吗?要考虑哪些问题?

友情链接

滇ICP备2023000592号-9