【Alink】学习&实践-数据源、数据处理、回归、分类、聚类

  • Alink学习链接汇总
    • 1、数据源读取
      • 1.1、读取CSV文件(分批流)
      • 1.2、按行读入文件(分批流)
      • 1.3、读取Kafka数据(流式)
        • (1)部署单节点kafka(使用kafka中的zookeeper)
        • (2)使用Alink流式写入/读取topic数据
          • 解析json数据+sql格式转化
    • 2、数据处理(边用边总结)
    • 3、回归
      • 3.1、线性回归
        • 训练(批式)(例子:批式读取csv----批式训练)
        • 预测(批式和流式)
          • 1.批式预测
          • 2.流式预测
    • 4、分类
    • 5、聚类
    • 数据导出
      • 导出CSV文件
      • 按行导出到文件
      • 导出到Mysql

Alink学习链接汇总

https://www.mianshigee.com/tutorial/Alink-1.0.1/regression.mdhttps://www.zhihu.com/people/alink_pinshu/posts?page=2

以下将采用jupyter进行学习&实践

#导入这一个包就可以了
from pyalink.alink import *
#重置运行环境,并切换到另一个环境
resetEnv()
useLocalEnv(1, flinkHome=None, config=None)

使用以下一条命令来开始使用 pyalink:设置是本地模式还是云环境

  • useLocalEnv(parallelism, flinkHome=None, config=None)
  • useRemoteEnv(host, port, parallelism, flinkHome=None, localIp=“localhost”, config=None)
    Call resetEnv() to reset environment and switch to another. 使用
    resetEnv() 来重置运行环境,并切换到另一个。
能否连接远程 Flink 集群进行计算?A:通过方法可以连接一个已经启动的 Flink 集群:useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", shipAlinkAlgoJar=True, config=None)。其中,参数host 和 port 表示集群的地址;
parallelism 表示执行作业的并行度;
flinkHome 为 flink 的完整路径,默认使用 PyAlink 自带的 flink-1.9.0 路径;
localIp 指定实现 Flink DataStream 的打印预览功能时所需的本机IP地址,需要 Flink 集群能访问。默认为localhost。
shipAlinkAlgoJar 是否将 PyAlink 提供的 Alink 算法包传输给远程集群,如果远程集群已经放置了 Alink 算法包,那么这里可以设为 False,减少数据传输。
Flink-1.10 及以上版本对应的 pyalink 包,还支持类似 pyflink 脚本的远程集群运行方式。

1、数据源读取

例子中所用的文件内容如下:

6.6,3.0,4.4,1.4,Iris-versicolor
5.7,2.9,4.2,1.3,Iris-versicolor
7.7,3.8,6.7,2.2,Iris-virginica
5.1,3.8,1.9,0.4,Iris-setosa
6.2,2.9,4.3,1.3,Iris-versicolor
4.8,3.0,1.4,0.3,Iris-setosa

1.1、读取CSV文件(分批流)


filePath = 'iris.csv'
schema = 'sepal_length double, sepal_width double, petal_length double, petal_width double, category string'
csvSource = CsvSourceBatchOp()\.setFilePath(filePath)\.setSchemaStr(schema)\.setFieldDelimiter(",")
BatchOperator.collectToDataframe(csvSource)

CsvSourceBatchOp() 批运算符
设置set参数:

  • filepath文件路径:可以是从本地、hdfs、http读取
    schemaStr:设置列名和数据类型(这个的数据类型包含很多:https://www.mianshigee.com/tutorial/Alink-1.0.1/cn-csvsourcebatchop.md
    https://zhuanlan.zhihu.com/p/103027854)
    setFieldDelimiter字段分割符
    skipBlankLine是否忽略空行,默认true
    ignoreFirstLine是否忽略第一行数据,默认false
    rowDelimiter 行分隔符,默认\n

BatchOperator.collectToDataframe(csvSource)
或者写成csvSource.collectToDataframe()
每个批式数据源或批式算子都支持collectToDataframe()方法,数据结果转成Python的DataFrame形式。则可以利用Python丰富的函数库及可视化功能,进行后续的分析和显示

dataframe    python数组和alink批数据源之间转换
(https://zhuanlan.zhihu.com/p/97462270)
import pandas as pdarr_2D =[['Alice',1],['Bob',2],['Cindy',3]
]df = pd.DataFrame(arr_2D)
#使用BatchOperator的fromDataFrame方法,将前面定义好的DataFrame类型变量df作为第一个参数,后面的参数用来定义数据的列名与类型,使用SchemaStr格式,即列名与其类型间用空格分隔,各列定义之间使用逗号进行分隔。
BatchOperator.fromDataframe(df, 'name string, value int').print()


filePath = 'iris.csv'
schema = 'sepal_length double, sepal_width double, petal_length double, petal_width double, category string'
csvSource = CsvSourceStreamOp()\.setFilePath(filePath)\.setSchemaStr(schema)\.setFieldDelimiter(",")
csvSource.print()
#流运算符的执行,是按照一定时间间隔,每次显示若干条
StreamOperator.execute()

print(self, key=None, refreshInterval=0, maxLimit=100)
参数:
key 为一个字符串,表示给对应的 Operator 给定一个索引;不传值时将随机生成。
refreshInterval 表示刷新时间,单位为秒。当这个值大于0时,所显示的表将每隔 refreshInterval 秒刷新,显示前 refreshInterval 的数据;当这个值小于0时,每次有新数据产生,就会在触发显示,所显示的数据项与时间无关。
maxLimit 用于控制显示的数据量,最多显示 maxLimit 条数据。

1.2、按行读入文件(分批流)

分为批流,很明显在执行的时候,感受到批式读取,直接数据展现;流式读取,数据一行一行的呈现。
我们想看一下iris数据,但不想花时间详细定义其数据列名及类型,就可以将其每条数据简单地看作一行文本,使用TextSourceBatchOp,并设置文件路径的参数。

URL = "iris.csv"
data = TextSourceBatchOp().setFilePath(URL).setTextCol("text")
data.print()


URL = "iris.csv"
data = TextSourceStreamOp().setFilePath(URL).setTextCol("text")
data.print()
StreamOperator.execute()

1.3、读取Kafka数据(流式)

(1)部署单节点kafka(使用kafka中的zookeeper)

下载kafka压缩包:
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
#上传到云服务器上进行解压:
##解压
tar xvf kafka_2.12-2.3.1.tgz
#以守护进程的模式进行-daemon 启动kafka里面的zookeeper:到kafka解压的包里进行执行:
/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#检查一下是否启动成功,QuorumPeerMain表示启动成功
jps
#启动kafka:检查一下启动是否成功,kafka占用端口为9092,zookeeper占用端口2181
/bin/kafka-server-start.sh -daemon  config/server.properties
#常用命令:
#指定zookeeper的地址、端口,副本个数为1(因为我只是单机一个broker) 分区数1 和topic名称test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test#查看top列表
bin/kafka-topics.sh --list --zookeeper localhost:2181#启动生产者,--broker-list 指定broker集群的端口 多个用逗号隔开 ,--topic指定具体的topicbin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#启动消费者,指定broker集群的端口 和具体的topic --from-beginning代表从头开始读取消息bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning#0.9版本之前:
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
#0.9版本之后:
./bin/kafka-console-consumer.sh --bootstrap-server 10.1.3.xxx:9092 --topic test --from-beginning

(2)使用Alink流式写入/读取topic数据

from pyalink.alink import *
resetEnv()
useLocalEnv(2, flinkHome=None, config=None)
#读取文件中的数据,查看一下数据内容
URL = "iris.csv"
SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
data = CsvSourceStreamOp()\
.setFilePath(URL)\
.setSchemaStr(SCHEMA_STR)\
.setFieldDelimiter(",")
data.print()
StreamOperator.execute()

#将读取到data变量的数据输入到kafka中,相当于kafka的生产者:
sink = KafkaSinkStreamOp()\
.setBootstrapServers("localhost:9092")\
.setDataFormat("json")\
.setTopic("test")data.link(sink)
StreamOperator.execute()
#消费者:利用KafkaSourceStreamOp()进行读取kafka的topic中的数据:
source = KafkaSourceStreamOp()\
.setBootstrapServers("localhost:9092")\
.setTopic("test")\
.setStartupMode("EARLIEST")\
.setGroupId("test-consumer-group")source.print(key='kafka_iris', refreshInterval=1, maxLimit=500)StreamOperator.execute()

print(self, key=None, refreshInterval=0, maxLimit=100)
参数:
key 为一个字符串,表示给对应的 Operator 给定一个索引;不传值时将随机生成。
refreshInterval 表示刷新时间,单位为秒。当这个值大于0时,所显示的表将每隔 refreshInterval 秒刷新,显示前 refreshInterval 的数据;当这个值小于0时,每次有新数据产生,就会在触发显示,所显示的数据项与时间无关。
maxLimit 用于控制显示的数据量,最多显示 maxLimit 条数据。

参考链接:
https://zhuanlan.zhihu.com/p/101143978
https://www.jianshu.com/p/7b3f8e16ee52
解析json数据+sql格式转化
根据上述程度读取的数据可以看出,message数据是json数据:我们需要对字符串里面的数据进行提取。
推荐使用JsonValueStreamOp,通过设置需要提取内容的JsonPath,提取出各列数据。JsonValueStreamOp提取出来的结果都是object类型的。
我们可以使用Flink SQL 的cast方法,在代码实现上,只需在连接JsonValueStreamOp之后,再连接SelectStreamOp并设置其SQL语句参数,

from pyalink.alink import *
useLocalEnv(1, flinkHome=None, config=None)
source = KafkaSourceStreamOp()\
.setBootstrapServers("localhost:9092")\
.setTopic("ceshi")\
.setStartupMode("EARLIEST")\
.setGroupId("test-consumer-group")
### link方法关联一些解析规则
#source.print()
data = source.link(### 解析成Json数据JsonValueStreamOp()\.setSelectedCol("message").setReservedCols([]).setOutputCols(["sepal_length", "sepal_width", "petal_length","petal_width", "category"]).setJsonPath(["$.sepal_length", "$.sepal_width",
"$.petal_length", "$.petal_width","$.category"])
).link(### 按照sql select格式转化SelectStreamOp()\.setClause("CAST(sepal_length AS DOUBLE) AS sepal_length, "\+ "CAST(sepal_width AS DOUBLE) AS sepal_width, "\+ "CAST(petal_length AS DOUBLE) AS petal_length, "\+ "CAST(petal_width AS DOUBLE) AS petal_width, category")
)data.print()
StreamOperator.execute()

2、数据处理(边用边总结)

3、回归

下面例子所用的文件data.csv:

50.186389494880601,69.847604158249183
52.378446219236217,86.098291205774103
50.135485486286122,59.108839267699643
33.644706006191782,69.89968164362763
39.557901222906828,44.862490711164398
56.130388816875467,85.498067778840223
57.362052133238237,95.536686846467219
60.269214393997906,70.251934419771587
35.678093889410732,52.721734964774988
31.588116998132829,50.392670135079896
53.66093226167304,63.642398775657753
46.682228649471917,72.247251068662365
43.107820219102464,57.812512976181402
70.34607561504933,104.25710158543822
44.492855880854073,86.642020318822006

3.1、线性回归

参考链接:
https://www.cnblogs.com/qiu-hua/p/14891409.html

流式读取csv----批式训练—批式预测
------------------流式读取的数据,无法在批式训练-----------
流式读取csv------流式预测
-------流式读取的数据类型就是 流式预测传入的数据类型格式,可以直接用训练好的模型---------
1、批式读取csv----批式训练—批式预测

2.批式读取csv----批式训练—流式预测
批式读取的数据是批式运算符BatchOperator 类型变量,可以直接进行训练,但是预测时需要转化为流数据:
先将BatchOperator —>>dataframe----->>StreamOperator
(1).先将BatchOperator —>>dataframe:
newdata=data.collectToDataframe()
newdata=BatchOperator.collectToDataframe(data)
(2).dataframe----->>StreamOperator:
streamData=StreamOperator.fromDataframe(newdata, schema)
streamData=dataframeToOperator(newdata, schemaStr=“col double,label double”, opType=“stream”)

训练(批式)(例子:批式读取csv----批式训练)

from pyalink.alink import *
resetEnv()
useLocalEnv(1, flinkHome=None, config=None)
批式读取csv:
schema='col double,label double'
data=CsvSourceBatchOp()\
.setFilePath("data.csv")\
.setSchemaStr(schema).setFieldDelimiter(",")
直接训练:
colnames=["col"]
lr=LinearRegTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model=data.link(lr)

预测(批式和流式)

1.批式预测
批式读取csv:
schema='col double,label double'
data=CsvSourceBatchOp()\
.setFilePath("data.csv")\
.setSchemaStr(schema).setFieldDelimiter(",")
直接训练:
colnames=["col"]
lr=LinearRegTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model=data.link(lr)
批式预测
predictor = LinearRegPredictBatchOp().setPredictionCol("pred")
predictor.linkFrom(model, data).firstN(5).print()
2.流式预测
批式读取csv:
schema='col double,label double'
data=CsvSourceBatchOp()\
.setFilePath("data.csv")\
.setSchemaStr(schema).setFieldDelimiter(",")
直接训练:
colnames=["col"]
lr=LinearRegTrainBatchOp().setFeatureCols(colnames).setLabelCol("label")
model=data.link(lr)
转换batchop---->>---dataframe----->>streamopdata
newdata=BatchOperator.collectToDataframe(data)或者newdata=data.collectToDataframe()streamData=dataframeToOperator(newdata, schemaStr="col double,label double", opType="stream")或者streamData=StreamOperator.fromDataframe(newdata, ‘col double,label double’)
流式预测:
predictor = LinearRegPredictStreamOp(model).setPredictionCol("pred")
predictor.linkFrom(streamData).print()
StreamOperator.execute()

4、分类

5、聚类

数据导出

导出CSV文件

按行导出到文件

导出到Mysql

【Alink-Python版本】学习实践-数据源、数据处理、回归、分类、聚类相关推荐

  1. Python数据挖掘学习笔记】九.回归模型LinearRegression简单分析氧化物数据

    #2018-03-23 16:26:20 March Friday the 12 week, the 082 day SZ SSMR [Python数据挖掘学习笔记]九.回归模型LinearRegre ...

  2. python分类算法的应用_07-机器学习_(lineage回归分类算法与应用) ---没用

    机器学习算法day04_Logistic回归分类算法及应用 课程大纲 Logistic回归分类算法原理 Logistic回归分类算法概述 Logistic回归分类算法思想 Logistic回归分类算法 ...

  3. 【Python爬虫学习实践】基于BeautifulSoup的网站解析及数据可视化

    在上一次的学习实践中,我们以Tencent职位信息网站为例,介绍了在爬虫中如何分析待解析的网站结构,同时也说明了利用Xpath和lxml解析网站的一般化流程.在本节的实践中,我们将以中国天气网为例,并 ...

  4. python dlib学习(九):人脸聚类

    前言 前面的博客介绍过使用dlib进行人脸检测.比对.检测特征点等等操作. python dlib学习(一):人脸检测 python dlib学习(二):人脸特征点标定 python dlib学习(五 ...

  5. 【Python爬虫学习实践】多线程爬取Bing每日壁纸

    在本节实践中,我们将借助Python多线程编程并采用生产者消费者模式来编写爬取Bing每日壁纸的爬虫.在正式编程前,我们还是一样地先来分析一下我们的需求及大体实现的过程. 总体设计预览 首先,我们先来 ...

  6. playwright python版本学习五-->locator常用定位总结

    playwright locator 定位方法实践,依据 官方文档 Text selector–文本定位 单独使用文本 1, text = "xxx"这种写法,是需要文本完全匹配 ...

  7. python爬虫学习实践(一):requests库和正则表达式之淘宝爬虫实战

    使用requests库是需要安装的,requests库相比urllib 库来说更高级方便一点,同时与scrapy相比较还是不够强大,本文主要介绍利用requests库和正则表达式完成一项简单的爬虫小项 ...

  8. 机器学习算法与Python实践之逻辑回归(Logistic Regression)

    转载自:http://blog.csdn.net/zouxy09/article/details/20319673 机器学习算法与Python实践这个系列主要是参考<机器学习实战>这本书. ...

  9. python爬去百度百科词条_Python爬虫入门学习实践——爬取小说

    本学期开始接触python,python是一种面向对象的.解释型的.通用的.开源的脚本编程语言,我觉得python最大的优点就是简单易用,学习起来比较上手,对代码格式的要求没有那么严格,这种风格使得我 ...

最新文章

  1. Tensorflow MNIST浅层神经网络的解释和答复
  2. kubernetes 清理日志命令_Kubernetes之容器数据写满磁盘解决方法
  3. QT的QBrush类的使用
  4. 音视频技术开发周刊 | 161
  5. 2-4 测试案例helloWorld
  6. 快速傅里叶变换之后的结果含义
  7. python函数-函数进阶
  8. Python遍历文件夹获取文件名并写入excel
  9. 【java学习之路】(java框架)002.Git配置及使用
  10. C++类引用中的构造函数与析构函数的执行顺序练习
  11. php程序员工具箱v0.6,php程序员工具箱官方版
  12. 京东客小程序功能模块源码V6.0.2
  13. uview ui与element ui的区别和用法
  14. 数据分析--企业的贤内助 附下载地址
  15. Java入门基本数据类型(羊驼)
  16. 涉密台式计算机密码可以输入几次,涉密打印机、扫描仪等与涉密计算机之间不采用无线方式连接 - 作业在线问答...
  17. matlab 设置坐标轴位置/方向 y轴反向
  18. 博基计划(4)---近红外光谱过程分析中基线漂移的主要来源
  19. POJ 2431 丛林探险(优先队列)
  20. 分享具体制作流程,利用下班时间听歌,昨天挣了400多

热门文章

  1. 通过Swagger快速生成接口文档
  2. spring解耦_云端时代的解耦:使用Spring Cloud Azure构建云端原生微服务
  3. 秒拍的高性能视频播放调度系统
  4. 关于机器学习中的似然函数的理解
  5. 东芝樱花花见官:文静 | 让生活因新鲜而不凡
  6. 常用置位、清零解释及stm32f103寄存器点亮led
  7. 从花样百出的双十一看产品运营
  8. 与element ui结合省市区三级联动
  9. 用Python DIY二维码背景
  10. H5页面生成word文件及发送邮件