前面第一小节也提到了,Structured Streaming会增量的从source中读取数据,映射成一张表,对该表进行增量的查询分析,然后组合中间状态,再把结果输出到结果表,然后刷到外部存储系统sink。

本小节主要是详细讲解source 和 sink。

1. source

目前支持的内置source有:

1) File Source

从给定的目录读取数据,目前支持的格式有text,csv,json,parquet。容错,文件必须原子操作的方式放置到指定的目录下,很多文件系统支持的move操作即可实现。

path:输入目录,对所有的文件格式通用。

maxFilesPerTrigger:每次触发读取文件的最大数。

latestFirst:是否先处理最新加入的文件,当有很多文件时,该参数有用(默认是false)。

fileNameOnly:检测新文件是否只根据文件名称,而不是整个文件路径,默认是false。假如,该值设置为true,那么下列文件会被认为是同一个文件:

"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"

2) Kafka Source:从kafka拉取数据。仅兼容kafka 0.10.0或者更高版本。容错。

3) Socket Source(for testing):

从一个连接中读取UTF8编码的文本数据。不容错。

该source的配置主要是两个host 去链接的目标主机; port 去连接的目标端口。

4) Rate Source(for testing

每秒钟产生给定行数的数据,每个输出包括一个时间戳 timestamp(Timestamp类型,代表消息分发的时间)和value(long类型,起始行是0)。该API用来测试的。

主要配置

rowsPerSecond (默认是1):每秒钟产生数据的行数。

rampUptime(默认是,0s,单位是秒,比如可以初始化为 5s):在生产速率达到rampUptime之前需要热身加速的时间。

numPartitions(),默认是spark的默认并行度。该值代表产生数据的分区数。

source将尽力达到rowsPerSecond,但查询可能受资源约束,并且可以调整numPartitions以帮助达到所需的速度。

2.output modes 

创建了source dataframe和组织了相关处理逻辑之后,就剩下以Dataset.writeStream的形式将数据写入到sink。下面,是一些常用的配置:

1). 输出sink 细节:数据格式,位置等。

2). 输出模式:指定输出到sink的内容。

A)Append mode(default):仅仅从上次触发计算到当前新增的行会被输出到sink。仅仅支持行数据插入结果表后不进行更改的query操作。因此,这种方式能保证每行数据仅仅输出一次。例如,带有Select,where,map,flatmap,filter,join等的query操作支持append模式。

B)Complete mode:每次trigger都会将整个结果表输出到sink。这个是针对聚合操作的。

C) Updata mode:仅仅是自上次trigger之后结果表有变更的行会输出到sink。在以后的版本中会有更详细的信息。

3). 查询名称:指定一个便于识别的查询名称。

4). 触发间隔:可选的指定触发间隔,如果没有指定,系统在之前的处理完之后,立即检查数据可用性。如果指定了时间间隔,由于处理时间导致下次出发延迟,那么结束后会立即触发处理。

5). Checkpoint位置对于一些支持端到端容错保证的sink,最好指定checkpoint信息。并且最好是hdfs 兼容的容错的文件系统。下面会详细讨论。

3.sinks

FileSink:保存数据到指定的目录

noAggDF.writeStream.format("parquet").option("checkpointLocation", "path/to/checkpoint/dir").option("path", "path/to/destination/dir").start() 

Foreach sink:可以在输出的数据上做任何操作。

writeStream.foreach(...)
.start()

Console sink(for debugging):每次trigger都会将结果输出到console或stdout。

aggDF.writeStream.outputMode("complete").format("console").start()

memory sink输出以一个内存表的形式保存在内存中。Append和complete模式都支持。由于数据是保存在driver端的,所以该模式适合小数据量测试

// Have all the aggregates in an in-memory table
aggDF.writeStream.queryName("aggregates")    // this query name will be the table name.outputMode("complete").format("memory").start()
spark.sql("select * from aggregates").show()

kafkasink

支持stream和batch数据写入kafka

val ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "topic1").start()

4.Sink支持的输出模式

Sink

Outputmode

Options

容错

注释

FileSink

Append

path:输出路径,必须指定

Yes, 仅一次处理

支持写入分区表。按照时间分区或许比较好使。

Kafkasink

Append,Update,complete

会有专题

YES。最少一次处理语义。

会有专题

ForeachSink

Append,Update,Complete

None

依赖于具体的ForeachWriter的实现

下面会有例子

ConsoleSink

Append,Complete,Update

NumRows:每个trigger显示的行数。Truncate:假如太长是否删除,默认是true

No

MemorySink

Append,Complete

None

No.但是在Completemode 重新query就会导致重新创建整张表

后续sql使用的表名就是queryName

要触发整个执行流程,必须调用start函数,该函数会返回一个StreamingQuery对象,该对象是正在运行的执行器的一个句柄。可以使用该对象来管理查询,后面详细介绍。

// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")   // Print new data to console
noAggDF.writeStream.format("console").start()// Write new data to Parquet files
noAggDF.writeStream.format("parquet").option("checkpointLocation", "path/to/checkpoint/dir").option("path", "path/to/destination/dir").start()// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()// Print updated aggregations to console
aggDF.writeStream.outputMode("complete").format("console").start()// Have all the aggregates in an in-memory table
aggDF.writeStream.queryName("aggregates")    // this query name will be the table name.outputMode("complete").format("memory").start()spark.sql("select * from aggregates").show()   // interactively query in-memory table

(2)source和sink详解相关推荐

  1. Debian操作系统的源配置信息详解--Source.list配置文件详解

    转载来源:https://wiki.debian.org/SourcesList#Repository_URL Debian操作系统的源配置信息详解--Source.list配置文件详解 Debian ...

  2. Android 驱动(12)---Linux DTS(Device Tree Source)设备树详解

    Linux DTS(Device Tree Source)设备树详解 Linux DTS(Device Tree Source)设备树详解之一(背景基础知识篇) Linux DTS(Device Tr ...

  3. 高通平台msm8953 Linux DTS(Device Tree Source)设备树详解之二(DTS设备树匹配过程)

    本系列导航: 高通平台8953  Linux DTS(Device Tree Source)设备树详解之一(背景基础知识篇) 高通平台8953 Linux DTS(Device Tree Source ...

  4. 高通平台8953 Linux DTS(Device Tree Source)设备树详解之一(背景基础知识篇)

    本系列导航: 高通平台8953  Linux DTS(Device Tree Source)设备树详解之一(背景基础知识篇) 高通平台8953 Linux DTS(Device Tree Source ...

  5. source, ~/.bashrc, ~/.bash_profile详解

    source命令的作用就是用来执行一个脚本,那么: source a.sh 同直接执行 ./a.sh 有什么不同呢,比如你在一个脚本里export $KKK=111 ,如果你用./a.sh执行该脚本, ...

  6. linux下source命令使用详解

    这篇文章主要为大家详细介绍了Linux Source命令及脚本的执行方式,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 Linux Source命令及脚本的执行方式解析 当我修改了/etc/pro ...

  7. linux source多个文件夹,linux下source命令使用详解

    source命令: source命令也称为"点命令",也就是一个点符号(.),是bash的内部命令. 功能:使Shell读入指定的Shell程序文件并依次执行文件中的所有语句 so ...

  8. DTS文件详解,DTS文件解析

    一.什么是DTS?为什么要引入DTS? DTS即Device Tree Source 设备树源码, Device Tree是一种描述硬件的数据结构,它起源于 OpenFirmware (OF). 在L ...

  9. Flume常用组件详解之Source

    Flume常用组件详解:Source Flume支持众多的source.sink.拦截器等组件具体实现,详细手册可参考官方文档http://flume.apache.org/FlumeUserGuid ...

最新文章

  1. 太嚣张了!会Python的人!
  2. 【Python3_进阶系列_006】Python3-单例模式
  3. Mysql实战:主从同步
  4. Silverlight 5 RC新特性探索系列:13.Silverlight 5 RC 新增对并行任务库(TPL)的支持
  5. PHP工厂模式计算面积与周长
  6. 网易云音乐上市首日收跌2.49% 盈利困境仍待解
  7. linux内核符号地址,Linux内核-模块专用地址空间
  8. Everything必知必会搜索教程
  9. 计算机管理单元受到策略限制怎么解决,组策略编辑器管理单元无法打开
  10. 14寸M1Pro-Max芯片款MAC安装Photoshop2021/2022安装教程硬核分享和激 活方式
  11. 编写程序 - 打印购物小票.
  12. php webim的开发,WebIM H5 Demo 介绍
  13. php自动加nofollow,WordPress自动给文章添加nofollow属性的实现方法
  14. 简单文本API的解析(一言)
  15. Java每日算法--罗马数字包含以下七种字符: I, V, X, L,C,D 和 M。
  16. 上取整与下取整的解析
  17. python 批量转换docx只转换了一个出现pywintypes.com_error被调用的对象已与其客户端断开连接
  18. 变位词算法C语言,程序实现英语变位词的搜索算法
  19. 知衣科技CEO郑泽宇:重新定义服装产业「柔性供应链」
  20. 小孔成像总结_相机标定是怎么回事——相机成像数学模型

热门文章

  1. Armbian 笔记六_使用 armbian-ddbr 命令 备份/还原 eMMC 系统
  2. c++ getline
  3. L293D知识全解:理论、图表、仿真和引脚排列
  4. 三步搞定esxi硬盘直通(RDM)
  5. 家居行业内卷?创新乏力?十大关键词解读IF ROOM如何定义未来主义家居
  6. C#控制多线程最大并行数量
  7. linux的生态包含哪些
  8. UI设计师有钱途还是平面设计师或者是网页设计师呢?
  9. 某数字藏品监控网站HMAC
  10. stm32F103c8t6飞控固件烧录