Flink SQL Client CLI 使用入门
参照:https://blog.csdn.net/boling_cavalry/article/details/105964425
一、SQL Client
Flink 的 Table & SQL API 可以处理 SQL 语言编写的查询语句,但是这些查询需要嵌入用 Java 或 Scala 编写的程序中。此外,这些程序在提交到集群前需要用构建工具打包。这或多或少限制了 Java/Scala 程序员对 Flink 的使用。
SQL 客户端 的目的是提供一种简单的方式来编写、调试和提交表程序到 Flink 集群上,而无需写一行 Java 或 Scala 代码。SQL 客户端命令行界面(CLI) 能够在命令行中检索和可视化分布式应用中实时产生的结果。
二、局限性
The SQL Client scripts are also located in the binary directory of Flink. In the future, a user will have two possibilities of starting the SQL Client CLI either by starting an embedded standalone process or by connecting to a remote SQL Client Gateway. At the moment only the
embedded
mode is supported. You can start the CLI by calling:
目前最新的Flink-1.12版本中,SQL Client只是个Beta版本(不适合用于生产环境),并且只能连接到本地Flink,不能像mysql、cassandra 等客户端工具那样远程连接server,这些在将来的版本会解决。具体参见 Flink 官网
三、实战操作
1、准备环境
Flink 1.12, JDK8
Flink 1.12 官网下载地址:https://flink.apache.org/downloads.html
2、解压 flink,进入 bin 目录
3、启动本地集群
[root@master bin]# ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host master.
Starting taskexecutor daemon on host master.
4、启动 sql client cli
[root@master bin]# ./sql-client.sh embedded
No default environment specified.
Searching for '/hadoop_data/soft/flink-1.12.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/hadoop_data/soft/flink-1.12.2/conf/sql-client-defaults.yaml
No session environment specified.Command history file path: /root/.flink-sql-history
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Flink SQL>
默认情况下,SQL 客户端将从 ./conf/sql-client-defaults.yaml 中读取配置,我们也可以指定配置文件,后面有例子。
5、查看 help 帮助命令。
Flink SQL> help;
The following commands are available:CLEAR Clears the current terminal.
CREATE TABLE Create table under current catalog and database.
DROP TABLE Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE Describes the schema of a table with the given name.
DROP VIEW Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN Describes the execution plan of a query or table with the given name.
HELP Prints the available commands.
INSERT INTO Inserts the results of a SQL SELECT query into a declared table sink.
INSERT OVERWRITE Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.
QUIT Quits the SQL CLI client.
RESET Resets all session configuration properties.
SELECT Executes a SQL SELECT query on the Flink cluster.
SET Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
SHOW FUNCTIONS Shows all user-defined and built-in functions.
SHOW TABLES Shows all registered tables.
SOURCE Reads a SQL SELECT query from a file and executes it on the Flink cluster.
USE CATALOG Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
USE Sets the current default database. Experimental! Syntax: 'USE <name>;'Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.
6、执行 sql 查询。
select 'hello world';
输入 q 退出结果视图
7、cli 提供三种可视化结果
(1)表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
SET execution.result-mode=table;
(2)变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。
SET execution.result-mode=changelog;
(3)tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):
SET execution.result-mode=tableau;
注意当你使用这个模式运行一个流式查询的时候,Flink 会将结果持续的打印在当前的屏幕之上。如果这个流式查询的输入是有限的数据集, 那么Flink在处理完所有的数据之后,会自动的停止作业,同时屏幕上的打印也会相应的停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按
键,这个会停掉作业同时停止屏幕上的打印。
我们可以用如下查询语句来查看三种结果模式的运行情况:
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
变更日志模式 下,看到的结果应该类似于:
+ Bob, 1
+ Alice, 1
+ Greg, 1
- Bob, 1
+ Bob, 2
表格模式 下,可视化结果表将不断更新,直到表程序内容结束:
Bob, 2
Alice, 1
Greg, 1
Tableau模式 下,如果这个查询以流的方式执行,那么将显示以下内容:
+-----+----------------------+----------------------+
| +/- | name | cnt |
+-----+----------------------+----------------------+
| + | Bob | 1 |
| + | Alice | 1 |
| + | Greg | 1 |
| - | Bob | 1 |
| + | Bob | 2 |
+-----+----------------------+----------------------+
Received a total of 5 rows
如果这个查询以批的方式执行,显示的内容如下:
+-------+-----+
| name | cnt |
+-------+-----+
| Alice | 1 |
| Bob | 2 |
| Greg | 1 |
+-------+-----+
3 rows in set
这几种结果模式在 SQL 查询的原型设计过程中都非常有用。这些模式的结果都存储在 SQL 客户端 的 Java 堆内存中。为了保持 CLI 界面及时响应,变更日志模式仅显示最近的 1000 个更改。表格模式支持浏览更大的结果,这些结果仅受可用主内存和配置的最大行数(max-table-result-rows)
的限制。
注意:
在批处理环境下执行的查询只能用表格模式或者Tableau模式进行检索。
定义查询语句后,可以将其作为长时间运行的独立 Flink 作业提交给集群。为此,其目标系统需要使用 INSERT INTO 语句指定存储结果。配置部分解释如何声明读取数据的 table source,写入数据的 sink 以及配置其他表程序属性的方法。
四、项目进阶
前面写了几行SQL,对Flink SQL Client有了最基本的感受,接下来做进一步的体验,内容如下:
(1)、创建CSV文件,这是个最简单的图书信息表,只有三个字段:名字、数量、类目,一共十条记录;
(2)、创建SQL Client用到的环境配置文件,该文件描述了数据源以及对应的表的信息;
(3)、启动SQL Client,执行SQL查询上述CSV文件;
(4)、整个操作步骤如下图所示:
创建 csv 文件 -> 启动 Flink -> 编写环境配置文件 -> 执行SQL(查表) -> 执行 SQL(查视图)
1、创建 book-store.csv 文件,内容如下
name001,1,aaa
name002,2,aaa
name003,3,bbb
name004,4,bbb
name005,5,bbb
name006,6,ccc
name007,7,ccc
name008,8,ccc
name009,9,ccc
name010,10,ccc
2、在 flink/config/ 下创建 book-store.yaml 文件,内容如下
tables:- name: BookStoretype: source-tableupdate-mode: appendconnector:type: filesystempath: "/hadoop_data/soft/test/book-store.csv"format:type: csvfields:- name: BookNametype: VARCHAR- name: BookAmounttype: INT- name: BookCatalogtype: VARCHARline-delimiter: "\n"comment-prefix: ","schema:- name: BookNametype: VARCHAR- name: BookAmounttype: INT- name: BookCatalogtype: VARCHAR- name: MyBookViewtype: viewquery: "SELECT BookCatalog, SUM(BookAmount) AS Amount FROM BookStore GROUP BY BookCatalog"execution:planner: blink # 可选:'blink' (默认)或 'old'type: streaming # 必选:执行模式为 'batch' 或 'streaming'result-mode: table # 必选:'table' 或 'changelog'max-table-result-rows: 1000000 # 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)time-characteristic: event-time # 可选:'processing-time' 或 'event-time' (默认)parallelism: 1 # 可选:Flink 的并行度(默认为 1)periodic-watermarks-interval: 200 # 可选:周期性 watermarks 的间隔时间(默认 200 ms)max-parallelism: 16 # 可选:Flink 的最大并行度(默认 128)min-idle-state-retention: 0 # 可选:表程序的最小空闲状态时间max-idle-state-retention: 0 # 可选:表程序的最大空闲状态时间(默认为当前 catalog 的默认数据库)current-catalog: default_catalog # 可选:当前会话 catalog 的名称(默认为 'default_catalog')current-database: default_database # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库)restart-strategy: # 可选:重启策略(restart-strategy)type: fallback # 默认情况下"回退"到全局重启策略# 用于调整和调优表程序的配置选项。# 在专用的"配置"页面上可以找到完整的选项列表及其默认值。
configuration:table.optimizer.join-reorder-enabled: truetable.exec.spill-compression.enabled: truetable.exec.spill-compression.block-size: 128kb# 描述表程序提交集群的属性。deployment:response-timeout: 5000
对于book-store.yaml文件,有以下几处需要注意:
- tables.type 等于 source-table,表明这是数据源的配置信息;
- tables.connector 描述了详细的数据源信息,path 是 book-store.csv 文件的完整路径;
- tables.format 描述了文件内容;
- tables.schema 描述了数据源表的表结构;
- type 为 view 表示 MyBookView 是个视图(参考数据库的视图概念);
3、在 flink/bin 目录执行以下命令,启动SQL Client,并指定book-store.yaml为环境配置
[root@master bin]# ./sql-client.sh embedded -d ../conf/book-store.yaml
Reading default environment from: file:/hadoop_data/soft/flink-1.12.2/bin/../conf/book-store.yaml
No session environment specified.Command history file path: /root/.flink-sql-history▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░ ▒▒▒▓██▒ ▒░██▒ ▒▒▓▓█▓▓▒░ ▒██████▒ ░▒▓███▒ ▒█▒█▒░▓█ ███ ▓░▒██▓█ ▒▒▒▒▒▓██▓░▒░▓▓██░ █ ▒▒░ ███▓▓█ ▒█▒▒▒████░ ▒▓█▓ ██▒▒▒ ▓███▒░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓███▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒▓█ ▒█▓ ░ █░ ▒█ █▓█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒███ ▓█▓░ ▒ ░▒█▒██▒ ▓▓▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒ ▓░ ▒█▓█ ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Flink SQL>
4、查全表
Flink SQL> show catalogs;
default_catalogFlink SQL> use catalog default_catalog;Flink SQL> show databases;
default_databaseFlink SQL> use default_database;Flink SQL> show tables;
BookStore
MyBookViewFlink SQL> select * from BookStore;
5、按照BookCatalog分组统计记录数
Flink SQL> DESCRIBE BookStore;
+-------------+--------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+-------------+--------+------+-----+--------+-----------+
| BookName | STRING | true | | | |
| BookAmount | INT | true | | | |
| BookCatalog | STRING | true | | | |
+-------------+--------+------+-----+--------+-----------+
3 rows in setFlink SQL> select BookCatalog,count(*) from BookStore group by BookCatalog;
6、查询视图
select * from MyBookView;
【一起学习】
Flink SQL Client CLI 使用入门相关推荐
- Flink SQL Client方言切换与datagen->Hive(DDL形式+streaming形式)
概述 本文是对[1]的完整复现,补充了[1]中缺失的大量细节. 切换方言 切换目标 Flink SQL Client命令 切换为hive SET table.sql-dialect=hive; 切换为 ...
- flink sql client读取hive时卡住
问题复现如下: 查看$FLINK_HOME/log/flink-appleyuchi-sql-client-Desktop.log 2020-12-23 11:48:56,811 INFO org. ...
- Flink SQL Client讀取csv中的數據(轉載+總結)
根據官方文檔[2] Flink SQL啓動方式 啓動命令 (1)starting an embedded standalone process $FLINK_HOME/bin/sql-client.s ...
- Flink SQL Client注册SCALA UDF完整流程
UDF的完整maven工程與SQL https://github.com/appleyuchi/Flink_SQL_Client_UDF 完整操作步骤 ①mvn scala:compile packa ...
- Flink SQL Client注册JAVA UDF完整流程
概述 听大佬说[1]里面有flink sql client注册udf的方法 去看了一眼,全是文字,闹心,索性琢磨了一下,记录下来. UDF的完整maven工程 https://github.com/a ...
- Flink SQL Client实现CDC实验
概述 本文主要是對[7]中內容的復現 环境 组件 版本 Flink(HA) 1.12 Zookeeper 3.6.0 flink-sql-connector-mysql-cdc 1.1.1 Mysql ...
- Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)
概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识 來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...
- Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)
#################################################################################################### ...
- flink sql client讀取kafka數據的timestamp(DDL方式)
实验目的 Kafka的数据能让Flink SQL Client读取到 本文是对[1]的详细记载 具体操作步骤 ①啓動hadoop集羣,離開安全模式 ②各个节点都关闭防火墙: service firew ...
最新文章
- oracle里面有emp表么,Oracle自带表(EMP)SQL语句练习
- Linux kernel 学习笔记(1) --分段分页保护机制
- 简单哈弗曼树(Java)
- 排序算法_桶排序(箱排序)
- 大数据图数据库之离线挖掘计算模型
- 使用Spark Streaming SQL基于时间窗口进行数据统计
- 微信小程序 Unexpected end of JSON input
- 第 7 章 本地方法栈
- PNG in IE - 1 - pngfix.js
- linux下tomcat部署java web项目_在linux下用tomcat部署java web项目的过程与注意事项
- php动态添加属性,php – Yii2.动态添加属性和规则到模型
- 【matlab】从图片中截取矩形区域(手工选取/标记在原图上/截取矩形区域并保存)
- centos 常用的网络登录端口测试工具
- CRC循环冗余校验码计算器(附C++ 和Qt实现的CRC-16/MODBUS代码)
- PHP用GD库实现简单的验证码
- 微信投票系统平台大全
- 崔希凡-javaWeb-笔记day07-day09(2016年7月26日23:17:27)
- 线性代数(九) : 矩阵的行简化阶梯型和标准型
- 模电——三极管各个引脚之间的电压关系
- Conversion failed when converting date and/or time from character string.