参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/api/python/index.html

pyflink是什么

  1. 数据流处理的框架
  2. 这个框架是同时运行在多台主机上
  3. 通过某种方式这多台主机之间可以通信
  4. 可以单机运行
  5. pyflink只是对java的flink的一个调用工具,不能直接用python来对sourcesink组件进行实现。

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。

PyFlink 架构

PyFlink 的核心目标:

1.将 Flink 能力输出到 Python 用户,进而可以让 Python 用户使用所有的 Flink 能力。

2.将 Python 生态现有的分析计算功能运行到 Flink 上,进而增强 Python 生态对大数据问题的解决能力。

应用场景

  • 第一个,事件驱动型,比如:刷单,监控等;
  • 第二个,数据分析型的,比如:库存,双11大屏等;
  • 第三个适用的场景是数据管道,也就是ETL场景,比如一些日志的解析等;
  • 第四个场景,机器学习,比如个性推荐等。

API

Flink 为流/批处理应用程序提供了不同级别的抽象。

  • SQL
  • Table API
  • DataStream/DataSet API(核心 API)
  • Stateful Stream Processing

PyFlink API 完全与 Java Table API 对齐,各种关系操作都支持,同时对 window 也有很好的支持,除了这些 APIs,PyFlink还提供多种定义 Python UDF 的方式.

UDF自定义函数

首先,可以扩展 ScalarFunction,这种方式可以提供更多的辅助功能,比如添加 Metrics 。除此之外 Python 语言所支持的任何方式的方法定义,在 PyFlink UDF 中都是支持的,比如:Lambda Function,Named Function 和 CallableFunction等。

当定义完方法后,用 PyFlink 所提供的 Decorators 进行打标,并描述 input 和 output 的数据类型就可以了。后面版本也可以根据 Python 语言的 type hint 特性再进一步简化,进行类型推导。

from pyflink.table import ScalarFunction, DataTypes
from pyflink.table.udf import udf# Extend ScalarFunction
class ADD(ScalarFunction):def eval(self, i ,j):return i + jadd1 = udf (ADD(), [DataTypes.BIGINT(), DataTypes.BIGINT()] ,DataTypes.BIGINT())# Named function
@udf(input_types =  [DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())
def add2( i ,j):return i + j# Lambda function
add3 = udf(lambda i,j :i+j, [DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())# Callable Function
class CallableAdd(object):def __call__(self, i,j):return i + jadd4 = udf(CallableAdd(),[DataTypes.BIGINT(), DataTypes.BIGINT() ], result_type = DataTypes.BIGINT())

pyflink安装

pip3  install -i https://pypi.tuna.tsinghua.edu.cn/simple/ apache-flink==1.13.5

最好指定版本,如1.13.2

实战

apache-flink 1.13.2

读取iceberg数据

pyflink安装目录/lib

flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

hive-exec-3.1.2.jar

alluxio-2.6.2-client.jar

iceberg-flink-runtime-1.13-0.13.1.jar

import osfrom pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment,StreamTableEnvironmentos.environ.setdefault('HADOOP_USER_NAME', 'root')env = StreamExecutionEnvironment.get_execution_environment()
env_settings  = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env,environment_settings=env_settings )t_env.get_config().get_configuration().set_string("parallelism.default", "1")
# t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)iceberg_hive_catalog = """CREATE CATALOG iceberg WITH('type'='iceberg','catalog-type'='hive' -- 可选 catalog类型 hive、hadoop、custom,'property-version'='1' -- 可选 属性版本号,可向后兼容,目前版本号为1,'cache-enabled' = 'true' -- 可选 是否启用catalog缓存 默认为true,'uri'='thrift://192.168.xxx.xxx:9083' -- 必填 hive 元数据存储连接,'clients'='5' -- hive metastore clients连接池大小,默认为2,'warehouse'='hdfs://ns1/lakehouse/')
""" t_env.get_current_catalog()
t_env.get_current_database()
# t_env.execute_sql(iceberg_hive_catalog).print()t_env.execute_sql("use catalog iceberg").print()t_env.execute_sql("show current catalog").print()
#
t_env.execute_sql("show databases").print()t_env.execute_sql("use dbname").print()t_env.execute_sql("show tables").print()table1 = t_env.execute_sql("select * from  ***")table2 = t_env.sql_query("select * from  xxx")pd = table2.to_pandas()

读取mysql数据

配置

pyflink安装目录/lib

mysql-connector-java-8.0.16.jar

flink-connector-jdbc_2.12-1.13.2.jar(https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc_2.12/1.13.2)

mysql创表

CREATE TABLE `flink_test` (`f0` int(11) DEFAULT NULL,`f1` int(11) DEFAULT NULL
) insert into flink_test VALUES(1,11)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table import EnvironmentSettings, TableEnvironment
# create environment
from pyflink.table.expressions import litsettings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)mysql_sink_ddl = """
CREATE TABLE flink_test (id BIGINT,word VARCHAR,`count` BIGINT,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector.type' = 'jdbc', -- 使用 jdbc connector'connector.url' = 'jdbc:mysql://192.168.xx.xx:3306/test','connector.table' = 'flink_test','connector.username' = 'xxx','connector.password' = 'xxx','connector.write.flush.interval' = '1s'
)
"""mysql_sink_ddl = """
create table flink_test (
f0 INT,
f1 INT
) WITH (
'connector' = 'jdbc', -- 使用 jdbc connector'url' = 'jdbc:mysql://192.168.xx.xxx:3306/test','username' = 'xx','table-name' = 'flink_test','password' = 'xx')
"""t_env.execute_sql(mysql_sink_ddl)
table = t_env.execute_sql("select * from flink_test")
# +-------------+-------------+
# |          f0 |          f1 |
# +-------------+-------------+
# |           1 |          11 |
# +-------------+-------------+
# 1 row in set
table = t_env.execute_sql("insert into  flink_test values(2,22)")table2 = t_env.sql_query('select * from flink_test')
# tab = t_env.from_path('flink_test')
table2.to_pandas()
#    f0  f1
# 0   1  11
# 1   2  22

读取一个 csv 文件,计算词频,并将结果写到一个结果文件中

文件名:word_count.py

参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/python/table_api_tutorial/

from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit# 创建TableEnvironment 。这是Python Table API作业的入口类。
settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")# 创建源表和结果表,在ExecutionEnvironment中注册表名分别为mySource和mySink的表。
t_env.connect(FileSystem().path('./input/word_count.csv')) \.with_format(OldCsv().field('word', DataTypes.STRING())) \.with_schema(Schema().field('word', DataTypes.STRING())) \.create_temporary_table('mySource')t_env.connect(FileSystem().path('./ouput/output.csv')) \.with_format(OldCsv().field_delimiter('\t').field('word', DataTypes.STRING()).field('count', DataTypes.BIGINT())) \.with_schema(Schema().field('word', DataTypes.STRING()).field('count', DataTypes.BIGINT())) \.create_temporary_table('mySink')# 下列代码实现的输出文件名是乱码,'/ouput/output.csv'被当作目录
'''
my_source_ddl = """create table mySource (word VARCHAR) with ('connector' = 'filesystem','format' = 'csv','path' = './input/word_count.csv')
"""my_sink_ddl = """create table mySink (word VARCHAR,`count` BIGINT) with ('connector' = 'filesystem','format' = 'csv','path' = '/ouput/output.csv')
"""
t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
'''
# 该作业读取表mySource中的数据
tab = t_env.from_path('mySource')
# 启动Flink Python Table API作业
# 当execute_insert(sink_name)被调用的时候, 作业才会被真正提交到集群或者本地进行执行。
tab.group_by(tab.word) \.select(tab.word, lit(1).count) \.execute_insert('mySink').wait()

报错

Could not find any factory for identifier ‘jdbc‘

flink-connector-jdbc_2.12-1.13.2.jar包没有放入指定位置

org.apache.flink.sql.parser.impl.ParseException: Encountered “)” at line 12, column 1.

建表语句最后一个括号前多了个逗号

参考

实例:

csv读入参数:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/csv/

使用mysql_Flink 使用python连接mysql: https://blog.csdn.net/weixin_32136203/article/details/112684291

pyflink连接iceberg 实践相关推荐

  1. Flinksql读取Kafka写入Iceberg 实践亲测

    Flink sql实时读取Kafka写入Iceberg 实践亲测 前言 本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink sql实时消费kafka中的数据并写入iceber ...

  2. demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记

    前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...

  3. Flink结合Kafka实时写入Iceberg实践笔记

    前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...

  4. 石墨文档Websocket百万长连接技术实践

    内容简介:Web 服务端推送技术经过了长轮询.短轮询的发展,最终到 HTML5 标准带来的 WebSocket 规范逐步成为了目前业内主流技术方案.它使得消息推送.消息通知等功能的实现变得异常简单,那 ...

  5. 石墨文档 Websocket 百万长连接技术实践

    引言 在石墨文档的部分业务中,例如文档分享.评论.幻灯片演示和文档表格跟随等场景,涉及到多客户端数据同步和服务端批量数据推送的需求,一般的 HTTP 协议无法满足服务端主动 Push 数据的场景,因此 ...

  6. Trino安装部署连接iceberg

    ** 一.Trino介绍 ** Trino是针对OLAP设计的用于高效的分布式查询大量数据的分析引擎.主要具备下列优点: 屏蔽底层数据源,提供统一查询接口: 基于内存计算,可以跨不同数据源完成联邦查询 ...

  7. jdbc连接池连不上mysql80_JDBC MySql连接池实践可避免连接池耗尽-问答-阿里云开发者社区-阿里云...

    异常表明应用程序代码的典型情况是数据库连接泄漏.你需要确保你获得和关闭所有的人(Connection,Statement 和 ResultSet)在try-with-resources按照正常的JDB ...

  8. android 实现蓝牙自动配对连接,Android实践 -- Android蓝牙设置连接

    蓝牙开发相关 使用Android Bluetooth APIs将设备通过蓝牙连接并通信,设置蓝牙,查找蓝牙设备,配对蓝牙设备 连接并传输数据,以下是Android系统提供的蓝牙相关的类和接口 Blue ...

  9. Android L2TP 一直连接失败,但PPTP可连接,实践解决方案

    最近公司为使用虚拟专用网下载内部文件,考虑到PPTP需使用GRE协议可能被封,openVΡN需要另加软件,于是便采用L2PT/IPsec协议.本来很顺利的,电脑测试连接成功,也就没有管了. 过了几天, ...

最新文章

  1. MongoDB 索引
  2. barrier linux,Linux系统上启用barrier?
  3. 4固定在底部_自建房不搭彩钢棚,4根钢结构撑个玻璃棚遮风挡雨,上面多个露台...
  4. python.freelycode.com-最难搞的python“反面”代码
  5. java四种访问权限
  6. python在工厂中的应用_Python工厂方法
  7. arduino的IDE开发stm32的板子
  8. python爬取网易云音乐问题陈述_python 网易云音乐 评论爬取问题
  9. 广义表及其存储方式简介
  10. LeetCode 439. 三元表达式解析器
  11. tableView中deselectRowAtIndexPath的作用
  12. 02.vs2015编译qt动态库
  13. 小程序快速入门:小程序的基本结构
  14. java游戏运行_用jar包运行带GUI的java游戏
  15. 二手手机验机教程(不拆机)
  16. 视频转成gif动图怎么操作?仅需三步在线完成视频转gif
  17. 高级运维工程师打怪升级之路
  18. win10资源管理器无法最小化,无法移动
  19. Python文本彩色图像去污
  20. java访问文件服务器,java远程服务器访问本地文件

热门文章

  1. 【第一期】看图识车站,快来看看你是不是资深铁路迷
  2. Android学习日记 Notification 通知
  3. 树莓派安装hp打印机
  4. 英诺森发布InStock2.0智能仓储管理平台,助力企业数字化转型
  5. 从微信错误报告窥探个人隐私
  6. python就业前景和工资待遇怎么样
  7. 巩固知识体系!西门子万人裁员:35岁之后找不到工作怎么办
  8. 采用Open3d绘制高度颜色点云图
  9. 黑盒测试方法详细介绍
  10. 大前端html菜单栏,9款精美别致的CSS3菜单和按钮