(2)source和sink详解
前面第一小节也提到了,Structured Streaming会增量的从source中读取数据,映射成一张表,对该表进行增量的查询分析,然后组合中间状态,再把结果输出到结果表,然后刷到外部存储系统sink。
本小节主要是详细讲解source 和 sink。
1. source
目前支持的内置source有:
1) File Source
从给定的目录读取数据,目前支持的格式有text,csv,json,parquet。容错,文件必须原子操作的方式放置到指定的目录下,很多文件系统支持的move操作即可实现。
path:输入目录,对所有的文件格式通用。
maxFilesPerTrigger:每次触发读取文件的最大数。
latestFirst:是否先处理最新加入的文件,当有很多文件时,该参数有用(默认是false)。
fileNameOnly:检测新文件是否只根据文件名称,而不是整个文件路径,默认是false。假如,该值设置为true,那么下列文件会被认为是同一个文件:
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
2) Kafka Source:从kafka拉取数据。仅兼容kafka 0.10.0或者更高版本。容错。
3) Socket Source(for testing):
从一个连接中读取UTF8编码的文本数据。不容错。
该source的配置主要是两个host 去链接的目标主机; port 去连接的目标端口。
4) Rate Source(for testing)
每秒钟产生给定行数的数据,每个输出包括一个时间戳 timestamp(Timestamp类型,代表消息分发的时间)和value(long类型,起始行是0)。该API用来测试的。
主要配置
rowsPerSecond (默认是1):每秒钟产生数据的行数。
rampUptime(默认是,0s,单位是秒,比如可以初始化为 5s):在生产速率达到rampUptime之前需要热身加速的时间。
numPartitions(),默认是spark的默认并行度。该值代表产生数据的分区数。
source将尽力达到rowsPerSecond,但查询可能受资源约束,并且可以调整numPartitions以帮助达到所需的速度。
2.output modes
创建了source dataframe和组织了相关处理逻辑之后,就剩下以Dataset.writeStream的形式将数据写入到sink。下面,是一些常用的配置:
1). 输出sink 细节:数据格式,位置等。
2). 输出模式:指定输出到sink的内容。
A)Append mode(default):仅仅从上次触发计算到当前新增的行会被输出到sink。仅仅支持行数据插入结果表后不进行更改的query操作。因此,这种方式能保证每行数据仅仅输出一次。例如,带有Select,where,map,flatmap,filter,join等的query操作支持append模式。
B)Complete mode:每次trigger都会将整个结果表输出到sink。这个是针对聚合操作的。
C) Updata mode:仅仅是自上次trigger之后结果表有变更的行会输出到sink。在以后的版本中会有更详细的信息。
3). 查询名称:指定一个便于识别的查询名称。
4). 触发间隔:可选的指定触发间隔,如果没有指定,系统在之前的处理完之后,立即检查数据可用性。如果指定了时间间隔,由于处理时间导致下次出发延迟,那么结束后会立即触发处理。
5). Checkpoint位置:对于一些支持端到端容错保证的sink,最好指定checkpoint信息。并且最好是hdfs 兼容的容错的文件系统。下面会详细讨论。
3.sinks
FileSink:保存数据到指定的目录
noAggDF.writeStream.format("parquet").option("checkpointLocation", "path/to/checkpoint/dir").option("path", "path/to/destination/dir").start()
Foreach sink:可以在输出的数据上做任何操作。
writeStream.foreach(...)
.start()
Console sink(for debugging):每次trigger都会将结果输出到console或stdout。
aggDF.writeStream.outputMode("complete").format("console").start()
memory sink:输出以一个内存表的形式保存在内存中。Append和complete模式都支持。由于数据是保存在driver端的,所以该模式适合小数据量测试。
// Have all the aggregates in an in-memory table
aggDF.writeStream.queryName("aggregates") // this query name will be the table name.outputMode("complete").format("memory").start()
spark.sql("select * from aggregates").show()
kafkasink
支持stream和batch数据写入kafka
val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "topic1").start()
4.Sink支持的输出模式
Sink |
Outputmode |
Options |
容错 |
注释 |
FileSink |
Append |
path:输出路径,必须指定 |
Yes, 仅一次处理 |
支持写入分区表。按照时间分区或许比较好使。 |
Kafkasink |
Append,Update,complete |
会有专题 |
YES。最少一次处理语义。 |
会有专题 |
ForeachSink |
Append,Update,Complete |
None |
依赖于具体的ForeachWriter的实现 |
下面会有例子 |
ConsoleSink |
Append,Complete,Update |
NumRows:每个trigger显示的行数。Truncate:假如太长是否删除,默认是true |
No |
|
MemorySink |
Append,Complete |
None |
No.但是在Completemode 重新query就会导致重新创建整张表 |
后续sql使用的表名就是queryName |
要触发整个执行流程,必须调用start函数,该函数会返回一个StreamingQuery对象,该对象是正在运行的执行器的一个句柄。可以使用该对象来管理查询,后面详细介绍。
// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10") // Print new data to console
noAggDF.writeStream.format("console").start()// Write new data to Parquet files
noAggDF.writeStream.format("parquet").option("checkpointLocation", "path/to/checkpoint/dir").option("path", "path/to/destination/dir").start()// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()// Print updated aggregations to console
aggDF.writeStream.outputMode("complete").format("console").start()// Have all the aggregates in an in-memory table
aggDF.writeStream.queryName("aggregates") // this query name will be the table name.outputMode("complete").format("memory").start()spark.sql("select * from aggregates").show() // interactively query in-memory table
(2)source和sink详解相关推荐
- Debian操作系统的源配置信息详解--Source.list配置文件详解
转载来源:https://wiki.debian.org/SourcesList#Repository_URL Debian操作系统的源配置信息详解--Source.list配置文件详解 Debian ...
- Android 驱动(12)---Linux DTS(Device Tree Source)设备树详解
Linux DTS(Device Tree Source)设备树详解 Linux DTS(Device Tree Source)设备树详解之一(背景基础知识篇) Linux DTS(Device Tr ...
- 高通平台msm8953 Linux DTS(Device Tree Source)设备树详解之二(DTS设备树匹配过程)
本系列导航: 高通平台8953 Linux DTS(Device Tree Source)设备树详解之一(背景基础知识篇) 高通平台8953 Linux DTS(Device Tree Source ...
- 高通平台8953 Linux DTS(Device Tree Source)设备树详解之一(背景基础知识篇)
本系列导航: 高通平台8953 Linux DTS(Device Tree Source)设备树详解之一(背景基础知识篇) 高通平台8953 Linux DTS(Device Tree Source ...
- source, ~/.bashrc, ~/.bash_profile详解
source命令的作用就是用来执行一个脚本,那么: source a.sh 同直接执行 ./a.sh 有什么不同呢,比如你在一个脚本里export $KKK=111 ,如果你用./a.sh执行该脚本, ...
- linux下source命令使用详解
这篇文章主要为大家详细介绍了Linux Source命令及脚本的执行方式,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 Linux Source命令及脚本的执行方式解析 当我修改了/etc/pro ...
- linux source多个文件夹,linux下source命令使用详解
source命令: source命令也称为"点命令",也就是一个点符号(.),是bash的内部命令. 功能:使Shell读入指定的Shell程序文件并依次执行文件中的所有语句 so ...
- DTS文件详解,DTS文件解析
一.什么是DTS?为什么要引入DTS? DTS即Device Tree Source 设备树源码, Device Tree是一种描述硬件的数据结构,它起源于 OpenFirmware (OF). 在L ...
- Flume常用组件详解之Source
Flume常用组件详解:Source Flume支持众多的source.sink.拦截器等组件具体实现,详细手册可参考官方文档http://flume.apache.org/FlumeUserGuid ...
最新文章
- 太嚣张了!会Python的人!
- 【Python3_进阶系列_006】Python3-单例模式
- Mysql实战:主从同步
- Silverlight 5 RC新特性探索系列:13.Silverlight 5 RC 新增对并行任务库(TPL)的支持
- PHP工厂模式计算面积与周长
- 网易云音乐上市首日收跌2.49% 盈利困境仍待解
- linux内核符号地址,Linux内核-模块专用地址空间
- Everything必知必会搜索教程
- 计算机管理单元受到策略限制怎么解决,组策略编辑器管理单元无法打开
- 14寸M1Pro-Max芯片款MAC安装Photoshop2021/2022安装教程硬核分享和激 活方式
- 编写程序 - 打印购物小票.
- php webim的开发,WebIM H5 Demo 介绍
- php自动加nofollow,WordPress自动给文章添加nofollow属性的实现方法
- 简单文本API的解析(一言)
- Java每日算法--罗马数字包含以下七种字符: I, V, X, L,C,D 和 M。
- 上取整与下取整的解析
- python 批量转换docx只转换了一个出现pywintypes.com_error被调用的对象已与其客户端断开连接
- 变位词算法C语言,程序实现英语变位词的搜索算法
- 知衣科技CEO郑泽宇:重新定义服装产业「柔性供应链」
- 小孔成像总结_相机标定是怎么回事——相机成像数学模型