参照:https://blog.csdn.net/boling_cavalry/article/details/105964425

一、SQL Client

Flink 的 Table & SQL API 可以处理 SQL 语言编写的查询语句,但是这些查询需要嵌入用 Java 或 Scala 编写的程序中。此外,这些程序在提交到集群前需要用构建工具打包。这或多或少限制了 Java/Scala 程序员对 Flink 的使用。

SQL 客户端 的目的是提供一种简单的方式来编写、调试和提交表程序到 Flink 集群上,而无需写一行 Java 或 Scala 代码。SQL 客户端命令行界面(CLI) 能够在命令行中检索和可视化分布式应用中实时产生的结果。

二、局限性

The SQL Client scripts are also located in the binary directory of Flink. In the future, a user will have two possibilities of starting the SQL Client CLI either by starting an embedded standalone process or by connecting to a remote SQL Client Gateway. At the moment only the embedded mode is supported. You can start the CLI by calling:

目前最新的Flink-1.12版本中,SQL Client只是个Beta版本(不适合用于生产环境),并且只能连接到本地Flink,不能像mysql、cassandra 等客户端工具那样远程连接server,这些在将来的版本会解决。具体参见 Flink 官网

三、实战操作

1、准备环境

Flink 1.12, JDK8

Flink 1.12 官网下载地址:https://flink.apache.org/downloads.html

2、解压 flink,进入 bin 目录

3、启动本地集群

[root@master bin]# ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host master.
Starting taskexecutor daemon on host master.

4、启动 sql client cli

[root@master bin]# ./sql-client.sh embedded
No default environment specified.
Searching for '/hadoop_data/soft/flink-1.12.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/hadoop_data/soft/flink-1.12.2/conf/sql-client-defaults.yaml
No session environment specified.

Command history file path: /root/.flink-sql-history
                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.

Flink SQL>

默认情况下,SQL 客户端将从 ./conf/sql-client-defaults.yaml 中读取配置,我们也可以指定配置文件,后面有例子。

5、查看 help 帮助命令。

Flink SQL> help;
The following commands are available:

CLEAR           Clears the current terminal.
CREATE TABLE            Create table under current catalog and database.
DROP TABLE              Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW             Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE                Describes the schema of a table with the given name.
DROP VIEW               Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN         Describes the execution plan of a query or table with the given name.
HELP            Prints the available commands.
INSERT INTO             Inserts the results of a SQL SELECT query into a declared table sink.
INSERT OVERWRITE                Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.
QUIT            Quits the SQL CLI client.
RESET           Resets all session configuration properties.
SELECT          Executes a SQL SELECT query on the Flink cluster.
SET             Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
SHOW FUNCTIONS          Shows all user-defined and built-in functions.
SHOW TABLES             Shows all registered tables.
SOURCE          Reads a SQL SELECT query from a file and executes it on the Flink cluster.
USE CATALOG             Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
USE             Sets the current default database. Experimental! Syntax: 'USE <name>;'

Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.

6、执行 sql 查询。

select 'hello world';

输入 q 退出结果视图

7、cli 提供三种可视化结果

(1)表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:

SET execution.result-mode=table;

(2)变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。

SET execution.result-mode=changelog;

(3)tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):

SET execution.result-mode=tableau;

注意当你使用这个模式运行一个流式查询的时候,Flink 会将结果持续的打印在当前的屏幕之上。如果这个流式查询的输入是有限的数据集, 那么Flink在处理完所有的数据之后,会自动的停止作业,同时屏幕上的打印也会相应的停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按

键,这个会停掉作业同时停止屏幕上的打印。

我们可以用如下查询语句来查看三种结果模式的运行情况:

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

变更日志模式 下,看到的结果应该类似于:

+ Bob, 1
+ Alice, 1
+ Greg, 1
- Bob, 1
+ Bob, 2

表格模式 下,可视化结果表将不断更新,直到表程序内容结束:

Bob, 2
Alice, 1
Greg, 1

Tableau模式 下,如果这个查询以流的方式执行,那么将显示以下内容:

+-----+----------------------+----------------------+
| +/- |                 name |                  cnt |
+-----+----------------------+----------------------+
|   + |                  Bob |                    1 |
|   + |                Alice |                    1 |
|   + |                 Greg |                    1 |
|   - |                  Bob |                    1 |
|   + |                  Bob |                    2 |
+-----+----------------------+----------------------+
Received a total of 5 rows

如果这个查询以批的方式执行,显示的内容如下:

+-------+-----+
|  name | cnt |
+-------+-----+
| Alice |   1 |
|   Bob |   2 |
|  Greg |   1 |
+-------+-----+
3 rows in set

这几种结果模式在 SQL 查询的原型设计过程中都非常有用。这些模式的结果都存储在 SQL 客户端 的 Java 堆内存中。为了保持 CLI 界面及时响应,变更日志模式仅显示最近的 1000 个更改。表格模式支持浏览更大的结果,这些结果仅受可用主内存和配置的最大行数(max-table-result-rows)

的限制。

注意:

在批处理环境下执行的查询只能用表格模式或者Tableau模式进行检索。

定义查询语句后,可以将其作为长时间运行的独立 Flink 作业提交给集群。为此,其目标系统需要使用 INSERT INTO 语句指定存储结果。配置部分解释如何声明读取数据的 table source,写入数据的 sink 以及配置其他表程序属性的方法。

四、项目进阶

前面写了几行SQL,对Flink SQL Client有了最基本的感受,接下来做进一步的体验,内容如下:

(1)、创建CSV文件,这是个最简单的图书信息表,只有三个字段:名字、数量、类目,一共十条记录;

(2)、创建SQL Client用到的环境配置文件,该文件描述了数据源以及对应的表的信息;

(3)、启动SQL Client,执行SQL查询上述CSV文件;

(4)、整个操作步骤如下图所示:

创建 csv 文件 -> 启动 Flink -> 编写环境配置文件 -> 执行SQL(查表) -> 执行 SQL(查视图)

1、创建 book-store.csv 文件,内容如下

name001,1,aaa
name002,2,aaa
name003,3,bbb
name004,4,bbb
name005,5,bbb
name006,6,ccc
name007,7,ccc
name008,8,ccc
name009,9,ccc
name010,10,ccc

2、在 flink/config/ 下创建 book-store.yaml 文件,内容如下

tables:- name: BookStoretype: source-tableupdate-mode: appendconnector:type: filesystempath: "/hadoop_data/soft/test/book-store.csv"format:type: csvfields:- name: BookNametype: VARCHAR- name: BookAmounttype: INT- name: BookCatalogtype: VARCHARline-delimiter: "\n"comment-prefix: ","schema:- name: BookNametype: VARCHAR- name: BookAmounttype: INT- name: BookCatalogtype: VARCHAR- name: MyBookViewtype: viewquery: "SELECT BookCatalog, SUM(BookAmount) AS Amount FROM BookStore GROUP BY BookCatalog"execution:planner: blink                     # 可选:'blink' (默认)或 'old'type: streaming                    # 必选:执行模式为 'batch' 或 'streaming'result-mode: table                 # 必选:'table' 或 'changelog'max-table-result-rows: 1000000     # 可选:'table' 模式下可维护的最大行数(默认为 1000000,小于 1 则表示无限制)time-characteristic: event-time    # 可选:'processing-time' 或 'event-time' (默认)parallelism: 1                     # 可选:Flink 的并行度(默认为 1)periodic-watermarks-interval: 200  # 可选:周期性 watermarks 的间隔时间(默认 200 ms)max-parallelism: 16                # 可选:Flink 的最大并行度(默认 128)min-idle-state-retention: 0        # 可选:表程序的最小空闲状态时间max-idle-state-retention: 0        # 可选:表程序的最大空闲状态时间(默认为当前 catalog 的默认数据库)current-catalog: default_catalog   # 可选:当前会话 catalog 的名称(默认为 'default_catalog')current-database: default_database # 可选:当前 catalog 的当前数据库名称(默认为当前 catalog 的默认数据库)restart-strategy:                  # 可选:重启策略(restart-strategy)type: fallback                     # 默认情况下"回退"到全局重启策略# 用于调整和调优表程序的配置选项。# 在专用的"配置"页面上可以找到完整的选项列表及其默认值。
configuration:table.optimizer.join-reorder-enabled: truetable.exec.spill-compression.enabled: truetable.exec.spill-compression.block-size: 128kb# 描述表程序提交集群的属性。deployment:response-timeout: 5000

对于book-store.yaml文件,有以下几处需要注意:

  • tables.type 等于 source-table,表明这是数据源的配置信息;
  • tables.connector 描述了详细的数据源信息,path 是 book-store.csv 文件的完整路径;
  • tables.format 描述了文件内容;
  • tables.schema 描述了数据源表的表结构;
  • type 为 view 表示 MyBookView 是个视图(参考数据库的视图概念);

3、在 flink/bin 目录执行以下命令,启动SQL Client,并指定book-store.yaml为环境配置

[root@master bin]# ./sql-client.sh embedded -d ../conf/book-store.yaml
Reading default environment from: file:/hadoop_data/soft/flink-1.12.2/bin/../conf/book-store.yaml
No session environment specified.Command history file path: /root/.flink-sql-history▒▓██▓██▒▓████▒▒█▓▒▓███▓▒▓███▓░░        ▒▒▒▓██▒  ▒░██▒   ▒▒▓▓█▓▓▒░      ▒██████▒         ░▒▓███▒    ▒█▒█▒░▓█            ███   ▓░▒██▓█       ▒▒▒▒▒▓██▓░▒░▓▓██░ █   ▒▒░       ███▓▓█ ▒█▒▒▒████░   ▒▓█▓      ██▒▒▒ ▓███▒░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓███▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓█▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒▓█   ▒█▓   ░     █░                ▒█              █▓█▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░█▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒███   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░______ _ _       _       _____  ____  _         _____ _ _            _  BETA   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|| |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.Flink SQL> 

4、查全表

Flink SQL> show catalogs;
default_catalogFlink SQL> use catalog default_catalog;Flink SQL> show databases;
default_databaseFlink SQL> use default_database;Flink SQL> show tables;
BookStore
MyBookViewFlink SQL> select * from BookStore;

5、按照BookCatalog分组统计记录数

Flink SQL> DESCRIBE BookStore;
+-------------+--------+------+-----+--------+-----------+
|        name |   type | null | key | extras | watermark |
+-------------+--------+------+-----+--------+-----------+
|    BookName | STRING | true |     |        |           |
|  BookAmount |    INT | true |     |        |           |
| BookCatalog | STRING | true |     |        |           |
+-------------+--------+------+-----+--------+-----------+
3 rows in setFlink SQL> select BookCatalog,count(*) from BookStore group by BookCatalog;

6、查询视图

select * from MyBookView;

【一起学习】

Flink SQL Client CLI 使用入门相关推荐

  1. Flink SQL Client方言切换与datagen->Hive(DDL形式+streaming形式)

    概述 本文是对[1]的完整复现,补充了[1]中缺失的大量细节. 切换方言 切换目标 Flink SQL Client命令 切换为hive SET table.sql-dialect=hive; 切换为 ...

  2. flink sql client读取hive时卡住

    问题复现如下: 查看$FLINK_HOME/log/flink-appleyuchi-sql-client-Desktop.log 2020-12-23 11:48:56,811 INFO  org. ...

  3. Flink SQL Client讀取csv中的數據(轉載+總結)

    根據官方文檔[2] Flink SQL啓動方式 啓動命令 (1)starting an embedded standalone process $FLINK_HOME/bin/sql-client.s ...

  4. Flink SQL Client注册SCALA UDF完整流程

    UDF的完整maven工程與SQL https://github.com/appleyuchi/Flink_SQL_Client_UDF 完整操作步骤 ①mvn scala:compile packa ...

  5. Flink SQL Client注册JAVA UDF完整流程

    概述 听大佬说[1]里面有flink sql client注册udf的方法 去看了一眼,全是文字,闹心,索性琢磨了一下,记录下来. UDF的完整maven工程 https://github.com/a ...

  6. Flink SQL Client实现CDC实验

    概述 本文主要是對[7]中內容的復現 环境 组件 版本 Flink(HA) 1.12 Zookeeper 3.6.0 flink-sql-connector-mysql-cdc 1.1.1 Mysql ...

  7. Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)

    概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识   來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...

  8. Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)

    #################################################################################################### ...

  9. flink sql client讀取kafka數據的timestamp(DDL方式)

    实验目的 Kafka的数据能让Flink SQL Client读取到 本文是对[1]的详细记载 具体操作步骤 ①啓動hadoop集羣,離開安全模式 ②各个节点都关闭防火墙: service firew ...

最新文章

  1. oracle里面有emp表么,Oracle自带表(EMP)SQL语句练习
  2. Linux kernel 学习笔记(1) --分段分页保护机制
  3. 简单哈弗曼树(Java)
  4. 排序算法_桶排序(箱排序)
  5. 大数据图数据库之离线挖掘计算模型
  6. 使用Spark Streaming SQL基于时间窗口进行数据统计
  7. 微信小程序 Unexpected end of JSON input
  8. 第 7 章 本地方法栈
  9. PNG in IE - 1 - pngfix.js
  10. linux下tomcat部署java web项目_在linux下用tomcat部署java web项目的过程与注意事项
  11. php动态添加属性,php – Yii2.动态添加属性和规则到模型
  12. 【matlab】从图片中截取矩形区域(手工选取/标记在原图上/截取矩形区域并保存)
  13. centos 常用的网络登录端口测试工具
  14. CRC循环冗余校验码计算器(附C++ 和Qt实现的CRC-16/MODBUS代码)
  15. PHP用GD库实现简单的验证码
  16. 微信投票系统平台大全
  17. 崔希凡-javaWeb-笔记day07-day09(2016年7月26日23:17:27)
  18. 线性代数(九) : 矩阵的行简化阶梯型和标准型
  19. 模电——三极管各个引脚之间的电压关系
  20. Conversion failed when converting date and/or time from character string.

热门文章

  1. 打算换个手机,感觉nokia还是不错的品牌选择
  2. Oracle 字符串拼接分号
  3. 牛客月赛19-皇家烈焰-(多状态dp)
  4. 金志文机器人歌叫什么_我的女友是机器人插曲叫什么名字
  5. 汇编语言练习(在树莓派4b上实现)- 基于《奔跑吧,Linux内核第二版》
  6. 好的广告片拍摄制作公司都具备哪些能力?
  7. 基于JS+CSS+HTML的跨年春节3D烟花模拟器
  8. 2018京东C++开发工程师实习面经
  9. 201900 TP-LINK WR700N V3 tplink无线路由器的那些事
  10. 【html5/css3】网站变灰是如何实现