【Alink-Python版本】学习实践-数据源、数据处理、回归、分类、聚类
【Alink】学习&实践-数据源、数据处理、回归、分类、聚类
- Alink学习链接汇总
- 1、数据源读取
- 1.1、读取CSV文件(分批流)
- 1.2、按行读入文件(分批流)
- 1.3、读取Kafka数据(流式)
- (1)部署单节点kafka(使用kafka中的zookeeper)
- (2)使用Alink流式写入/读取topic数据
- 解析json数据+sql格式转化
- 2、数据处理(边用边总结)
- 3、回归
- 3.1、线性回归
- 训练(批式)(例子:批式读取csv----批式训练)
- 预测(批式和流式)
- 1.批式预测
- 2.流式预测
- 4、分类
- 5、聚类
- 数据导出
- 导出CSV文件
- 按行导出到文件
- 导出到Mysql
Alink学习链接汇总
https://www.mianshigee.com/tutorial/Alink-1.0.1/regression.mdhttps://www.zhihu.com/people/alink_pinshu/posts?page=2
以下将采用jupyter进行学习&实践
#导入这一个包就可以了
from pyalink.alink import *
#重置运行环境,并切换到另一个环境
resetEnv()
useLocalEnv(1, flinkHome=None, config=None)
使用以下一条命令来开始使用 pyalink:设置是本地模式还是云环境
- useLocalEnv(parallelism, flinkHome=None, config=None)
- useRemoteEnv(host, port, parallelism, flinkHome=None, localIp=“localhost”, config=None)
Call resetEnv() to reset environment and switch to another. 使用
resetEnv() 来重置运行环境,并切换到另一个。
能否连接远程 Flink 集群进行计算?A:通过方法可以连接一个已经启动的 Flink 集群:useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", shipAlinkAlgoJar=True, config=None)。其中,参数host 和 port 表示集群的地址;
parallelism 表示执行作业的并行度;
flinkHome 为 flink 的完整路径,默认使用 PyAlink 自带的 flink-1.9.0 路径;
localIp 指定实现 Flink DataStream 的打印预览功能时所需的本机IP地址,需要 Flink 集群能访问。默认为localhost。
shipAlinkAlgoJar 是否将 PyAlink 提供的 Alink 算法包传输给远程集群,如果远程集群已经放置了 Alink 算法包,那么这里可以设为 False,减少数据传输。
Flink-1.10 及以上版本对应的 pyalink 包,还支持类似 pyflink 脚本的远程集群运行方式。
1、数据源读取
例子中所用的文件内容如下:
6.6,3.0,4.4,1.4,Iris-versicolor
5.7,2.9,4.2,1.3,Iris-versicolor
7.7,3.8,6.7,2.2,Iris-virginica
5.1,3.8,1.9,0.4,Iris-setosa
6.2,2.9,4.3,1.3,Iris-versicolor
4.8,3.0,1.4,0.3,Iris-setosa
1.1、读取CSV文件(分批流)
filePath = 'iris.csv'
schema = 'sepal_length double, sepal_width double, petal_length double, petal_width double, category string'
csvSource = CsvSourceBatchOp()\.setFilePath(filePath)\.setSchemaStr(schema)\.setFieldDelimiter(",")
BatchOperator.collectToDataframe(csvSource)
CsvSourceBatchOp() 批运算符
设置set参数:
- filepath文件路径:可以是从本地、hdfs、http读取
schemaStr:设置列名和数据类型(这个的数据类型包含很多:https://www.mianshigee.com/tutorial/Alink-1.0.1/cn-csvsourcebatchop.md
https://zhuanlan.zhihu.com/p/103027854)
setFieldDelimiter字段分割符
skipBlankLine是否忽略空行,默认true
ignoreFirstLine是否忽略第一行数据,默认false
rowDelimiter 行分隔符,默认\nBatchOperator.collectToDataframe(csvSource)
或者写成csvSource.collectToDataframe()
每个批式数据源或批式算子都支持collectToDataframe()方法,数据结果转成Python的DataFrame形式。则可以利用Python丰富的函数库及可视化功能,进行后续的分析和显示
dataframe python数组和alink批数据源之间转换
(https://zhuanlan.zhihu.com/p/97462270)
import pandas as pdarr_2D =[['Alice',1],['Bob',2],['Cindy',3]
]df = pd.DataFrame(arr_2D)
#使用BatchOperator的fromDataFrame方法,将前面定义好的DataFrame类型变量df作为第一个参数,后面的参数用来定义数据的列名与类型,使用SchemaStr格式,即列名与其类型间用空格分隔,各列定义之间使用逗号进行分隔。
BatchOperator.fromDataframe(df, 'name string, value int').print()
filePath = 'iris.csv'
schema = 'sepal_length double, sepal_width double, petal_length double, petal_width double, category string'
csvSource = CsvSourceStreamOp()\.setFilePath(filePath)\.setSchemaStr(schema)\.setFieldDelimiter(",")
csvSource.print()
#流运算符的执行,是按照一定时间间隔,每次显示若干条
StreamOperator.execute()
print(self, key=None, refreshInterval=0, maxLimit=100)
参数:
key 为一个字符串,表示给对应的 Operator 给定一个索引;不传值时将随机生成。
refreshInterval 表示刷新时间,单位为秒。当这个值大于0时,所显示的表将每隔 refreshInterval 秒刷新,显示前 refreshInterval 的数据;当这个值小于0时,每次有新数据产生,就会在触发显示,所显示的数据项与时间无关。
maxLimit 用于控制显示的数据量,最多显示 maxLimit 条数据。
1.2、按行读入文件(分批流)
分为批流,很明显在执行的时候,感受到批式读取,直接数据展现;流式读取,数据一行一行的呈现。
我们想看一下iris数据,但不想花时间详细定义其数据列名及类型,就可以将其每条数据简单地看作一行文本,使用TextSourceBatchOp,并设置文件路径的参数。
URL = "iris.csv"
data = TextSourceBatchOp().setFilePath(URL).setTextCol("text")
data.print()
URL = "iris.csv"
data = TextSourceStreamOp().setFilePath(URL).setTextCol("text")
data.print()
StreamOperator.execute()
1.3、读取Kafka数据(流式)
(1)部署单节点kafka(使用kafka中的zookeeper)
下载kafka压缩包:
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
#上传到云服务器上进行解压:
##解压
tar xvf kafka_2.12-2.3.1.tgz
#以守护进程的模式进行-daemon 启动kafka里面的zookeeper:到kafka解压的包里进行执行:
/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#检查一下是否启动成功,QuorumPeerMain表示启动成功
jps
#启动kafka:检查一下启动是否成功,kafka占用端口为9092,zookeeper占用端口2181
/bin/kafka-server-start.sh -daemon config/server.properties
#常用命令:
#指定zookeeper的地址、端口,副本个数为1(因为我只是单机一个broker) 分区数1 和topic名称test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test#查看top列表
bin/kafka-topics.sh --list --zookeeper localhost:2181#启动生产者,--broker-list 指定broker集群的端口 多个用逗号隔开 ,--topic指定具体的topicbin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#启动消费者,指定broker集群的端口 和具体的topic --from-beginning代表从头开始读取消息bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning#0.9版本之前:
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
#0.9版本之后:
./bin/kafka-console-consumer.sh --bootstrap-server 10.1.3.xxx:9092 --topic test --from-beginning
(2)使用Alink流式写入/读取topic数据
from pyalink.alink import *
resetEnv()
useLocalEnv(2, flinkHome=None, config=None)
#读取文件中的数据,查看一下数据内容
URL = "iris.csv"
SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
data = CsvSourceStreamOp()\
.setFilePath(URL)\
.setSchemaStr(SCHEMA_STR)\
.setFieldDelimiter(",")
data.print()
StreamOperator.execute()
#将读取到data变量的数据输入到kafka中,相当于kafka的生产者:
sink = KafkaSinkStreamOp()\
.setBootstrapServers("localhost:9092")\
.setDataFormat("json")\
.setTopic("test")data.link(sink)
StreamOperator.execute()
#消费者:利用KafkaSourceStreamOp()进行读取kafka的topic中的数据:
source = KafkaSourceStreamOp()\
.setBootstrapServers("localhost:9092")\
.setTopic("test")\
.setStartupMode("EARLIEST")\
.setGroupId("test-consumer-group")source.print(key='kafka_iris', refreshInterval=1, maxLimit=500)StreamOperator.execute()
print(self, key=None, refreshInterval=0, maxLimit=100)
参数:
key 为一个字符串,表示给对应的 Operator 给定一个索引;不传值时将随机生成。
refreshInterval 表示刷新时间,单位为秒。当这个值大于0时,所显示的表将每隔 refreshInterval 秒刷新,显示前 refreshInterval 的数据;当这个值小于0时,每次有新数据产生,就会在触发显示,所显示的数据项与时间无关。
maxLimit 用于控制显示的数据量,最多显示 maxLimit 条数据。
参考链接:
https://zhuanlan.zhihu.com/p/101143978
https://www.jianshu.com/p/7b3f8e16ee52
解析json数据+sql格式转化
根据上述程度读取的数据可以看出,message数据是json数据:我们需要对字符串里面的数据进行提取。
推荐使用JsonValueStreamOp,通过设置需要提取内容的JsonPath,提取出各列数据。JsonValueStreamOp提取出来的结果都是object类型的。
我们可以使用Flink SQL 的cast方法,在代码实现上,只需在连接JsonValueStreamOp之后,再连接SelectStreamOp并设置其SQL语句参数,
from pyalink.alink import *
useLocalEnv(1, flinkHome=None, config=None)
source = KafkaSourceStreamOp()\
.setBootstrapServers("localhost:9092")\
.setTopic("ceshi")\
.setStartupMode("EARLIEST")\
.setGroupId("test-consumer-group")
### link方法关联一些解析规则
#source.print()
data = source.link(### 解析成Json数据JsonValueStreamOp()\.setSelectedCol("message").setReservedCols([]).setOutputCols(["sepal_length", "sepal_width", "petal_length","petal_width", "category"]).setJsonPath(["$.sepal_length", "$.sepal_width",
"$.petal_length", "$.petal_width","$.category"])
).link(### 按照sql select格式转化SelectStreamOp()\.setClause("CAST(sepal_length AS DOUBLE) AS sepal_length, "\+ "CAST(sepal_width AS DOUBLE) AS sepal_width, "\+ "CAST(petal_length AS DOUBLE) AS petal_length, "\+ "CAST(petal_width AS DOUBLE) AS petal_width, category")
)data.print()
StreamOperator.execute()
2、数据处理(边用边总结)
3、回归
下面例子所用的文件data.csv:
50.186389494880601,69.847604158249183
52.378446219236217,86.098291205774103
50.135485486286122,59.108839267699643
33.644706006191782,69.89968164362763
39.557901222906828,44.862490711164398
56.130388816875467,85.498067778840223
57.362052133238237,95.536686846467219
60.269214393997906,70.251934419771587
35.678093889410732,52.721734964774988
31.588116998132829,50.392670135079896
53.66093226167304,63.642398775657753
46.682228649471917,72.247251068662365
43.107820219102464,57.812512976181402
70.34607561504933,104.25710158543822
44.492855880854073,86.642020318822006
3.1、线性回归
参考链接:
https://www.cnblogs.com/qiu-hua/p/14891409.html
流式读取csv----批式训练—批式预测
------------------流式读取的数据,无法在批式训练-----------
流式读取csv------流式预测
-------流式读取的数据类型就是 流式预测传入的数据类型格式,可以直接用训练好的模型---------
1、批式读取csv----批式训练—批式预测
2.批式读取csv----批式训练—流式预测
批式读取的数据是批式运算符BatchOperator 类型变量,可以直接进行训练,但是预测时需要转化为流数据:
先将BatchOperator —>>dataframe----->>StreamOperator
(1).先将BatchOperator —>>dataframe:
newdata=data.collectToDataframe()
newdata=BatchOperator.collectToDataframe(data)
(2).dataframe----->>StreamOperator:
streamData=StreamOperator.fromDataframe(newdata, schema)
streamData=dataframeToOperator(newdata, schemaStr=“col double,label double”, opType=“stream”)
训练(批式)(例子:批式读取csv----批式训练)
from pyalink.alink import *
resetEnv()
useLocalEnv(1, flinkHome=None, config=None)
批式读取csv:
schema='col double,label double'
data=CsvSourceBatchOp()\
.setFilePath("data.csv")\
.setSchemaStr(schema).setFieldDelimiter(",")
直接训练:
colnames=["col"]
lr=LinearRegTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model=data.link(lr)
预测(批式和流式)
1.批式预测
批式读取csv:
schema='col double,label double'
data=CsvSourceBatchOp()\
.setFilePath("data.csv")\
.setSchemaStr(schema).setFieldDelimiter(",")
直接训练:
colnames=["col"]
lr=LinearRegTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model=data.link(lr)
批式预测
predictor = LinearRegPredictBatchOp().setPredictionCol("pred")
predictor.linkFrom(model, data).firstN(5).print()
2.流式预测
批式读取csv:
schema='col double,label double'
data=CsvSourceBatchOp()\
.setFilePath("data.csv")\
.setSchemaStr(schema).setFieldDelimiter(",")
直接训练:
colnames=["col"]
lr=LinearRegTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model=data.link(lr)
转换batchop---->>---dataframe----->>streamopdata
newdata=BatchOperator.collectToDataframe(data)或者newdata=data.collectToDataframe()streamData=dataframeToOperator(newdata, schemaStr="col double,label double", opType="stream")或者streamData=StreamOperator.fromDataframe(newdata, ‘col double,label double’)
流式预测:
predictor = LinearRegPredictStreamOp(model).setPredictionCol("pred")
predictor.linkFrom(streamData).print()
StreamOperator.execute()
4、分类
5、聚类
数据导出
导出CSV文件
按行导出到文件
导出到Mysql
【Alink-Python版本】学习实践-数据源、数据处理、回归、分类、聚类相关推荐
- Python数据挖掘学习笔记】九.回归模型LinearRegression简单分析氧化物数据
#2018-03-23 16:26:20 March Friday the 12 week, the 082 day SZ SSMR [Python数据挖掘学习笔记]九.回归模型LinearRegre ...
- python分类算法的应用_07-机器学习_(lineage回归分类算法与应用) ---没用
机器学习算法day04_Logistic回归分类算法及应用 课程大纲 Logistic回归分类算法原理 Logistic回归分类算法概述 Logistic回归分类算法思想 Logistic回归分类算法 ...
- 【Python爬虫学习实践】基于BeautifulSoup的网站解析及数据可视化
在上一次的学习实践中,我们以Tencent职位信息网站为例,介绍了在爬虫中如何分析待解析的网站结构,同时也说明了利用Xpath和lxml解析网站的一般化流程.在本节的实践中,我们将以中国天气网为例,并 ...
- python dlib学习(九):人脸聚类
前言 前面的博客介绍过使用dlib进行人脸检测.比对.检测特征点等等操作. python dlib学习(一):人脸检测 python dlib学习(二):人脸特征点标定 python dlib学习(五 ...
- 【Python爬虫学习实践】多线程爬取Bing每日壁纸
在本节实践中,我们将借助Python多线程编程并采用生产者消费者模式来编写爬取Bing每日壁纸的爬虫.在正式编程前,我们还是一样地先来分析一下我们的需求及大体实现的过程. 总体设计预览 首先,我们先来 ...
- playwright python版本学习五-->locator常用定位总结
playwright locator 定位方法实践,依据 官方文档 Text selector–文本定位 单独使用文本 1, text = "xxx"这种写法,是需要文本完全匹配 ...
- python爬虫学习实践(一):requests库和正则表达式之淘宝爬虫实战
使用requests库是需要安装的,requests库相比urllib 库来说更高级方便一点,同时与scrapy相比较还是不够强大,本文主要介绍利用requests库和正则表达式完成一项简单的爬虫小项 ...
- 机器学习算法与Python实践之逻辑回归(Logistic Regression)
转载自:http://blog.csdn.net/zouxy09/article/details/20319673 机器学习算法与Python实践这个系列主要是参考<机器学习实战>这本书. ...
- python爬去百度百科词条_Python爬虫入门学习实践——爬取小说
本学期开始接触python,python是一种面向对象的.解释型的.通用的.开源的脚本编程语言,我觉得python最大的优点就是简单易用,学习起来比较上手,对代码格式的要求没有那么严格,这种风格使得我 ...
最新文章
- Tensorflow MNIST浅层神经网络的解释和答复
- kubernetes 清理日志命令_Kubernetes之容器数据写满磁盘解决方法
- QT的QBrush类的使用
- 音视频技术开发周刊 | 161
- 2-4 测试案例helloWorld
- 快速傅里叶变换之后的结果含义
- python函数-函数进阶
- Python遍历文件夹获取文件名并写入excel
- 【java学习之路】(java框架)002.Git配置及使用
- C++类引用中的构造函数与析构函数的执行顺序练习
- php程序员工具箱v0.6,php程序员工具箱官方版
- 京东客小程序功能模块源码V6.0.2
- uview ui与element ui的区别和用法
- 数据分析--企业的贤内助 附下载地址
- Java入门基本数据类型(羊驼)
- 涉密台式计算机密码可以输入几次,涉密打印机、扫描仪等与涉密计算机之间不采用无线方式连接 - 作业在线问答...
- matlab 设置坐标轴位置/方向 y轴反向
- 博基计划(4)---近红外光谱过程分析中基线漂移的主要来源
- POJ 2431 丛林探险(优先队列)
- 分享具体制作流程,利用下班时间听歌,昨天挣了400多