Structured Streaming 案例初体验
Structured Streaming程序基本步骤
编写Structured Streaming程序的基本步骤是:
1.创建SparkSession实例;
2.创建DataFrame表示从数据源输入的每一行数据;
3.DataFrame转换,类似于RDD转换操作;
4.创建StreamingQuery开启流查询;
5.调用StreamingQuery.awaitTermination()方法,等待流查询结束。
Structured Streaming操作示例
这里还是用统计单词个数的示例,来展示如何进行Structured Streaming操作;
获取SparkSession
要操作Structured Streaming,首先要获取SparkSession实例,我们可以如下创建一个SparkSession;登录Linux系统,启动spark-shell。然后输入一下代码:
scala> import org.apache.spark.sql.functions._ scala> import org.apache.spark.sql.SparkSession val spark = SparkSession.| builder.| appName("StructuredNetworkWordCount").| getOrCreate() spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d1bf7bfscala> import spark.implicits._
首先引入两个包,接着实例化一个SparkSession对象;appName方法设置spark应用名称,getOrCreate方法表示如果已存在一个SparkSession,则直接用现成的SparkSession,否则创建一个SparkSession,其实spark就是已经存在的SparkSession,因为每次开启spark-shell时,就默认开启了一个SparkSession;千万别忘了引入spark.implicits._包,否则会报错。
创建DataFrame
创建好SparkSession后,即可用SparkSession.readStream方法创建DataFrame,在spark-shell输入一下代码:
scala> val lines = spark.readStream.| format("socket").| option("host","localhost").| option("port",9999).| load() lines: org.apache.spark.sql.DataFrame = [value: string]
SparkSession.readStream方法返回一个DataStreamReader实例,接着DataStreamReader.format设置数据源为网络套接字,这里还可以设置从文件获取数据;然后调用两次DataStreamReader.option分别设置套接字的主机和端口;最后调用DataStreamReader.load方法,返回一个DataFrame实例;
此时,lines可以理解为之前Spark2.0入门:Structured Streaming简介提到的”unbound table”,实时到来的数据,即添加到这个”unbound table”;由输出可以看出,这个DataFrame每行包含一个字符串;
DataFrame转换
获取lines这个DataFrame之后,接下来就要处理这个DataFrame;类似于RDD转换操作,DataFrame也是通过转换成新的DataFrame来处理数据;因此由之前lines获取到每行数据之后,接下来要进行分隔并进行统计;
scala> val words = lines.as[String].flatMap(_.split(" ")) words: org.apache.spark.sql.Dataset[String] = [value: string]scala> val wordCounts = words.groupBy("value").count() wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]
lines.as[String]将DataFrame转换成DataSet,其实DataFrame只是DataSet的特例,在官方的API文档上有如下声明
type DataFrame = Dataset[Row]
接着调用flatMap将一行实时数据按空格切割成单词集合;groupBy方法表示按”value”这个属性合并,count方法表示统计出每个单词出现的次数;此时的wordCounts又转换为DataFrame,且这个DataFrame有两个属性,value和count,即每次处理完一行实时数据时,都会输出单词和该单词出现的次数;
执行流查询
如果DataFrame转换操作定义结束,接下来即可开启流查询,在spark-shell输入如下:
scala> val query = wordCounts.writeStream.| outputMode("complete").| format("console").| start() query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - query-0 [state = ACTIVE]scala> query.awaitTermination()
wordCounts.writeStream返回一个 DataStreamWriter实例,该实例定义了将实时流查询产生结果输出到外部存储的接口;outputMode设置了’complete’模式,即每次都输出全部结果数据;format定义输出媒介,这里为控制台;最后调用start方法开启流查询并返回一个StreamingQuery实例;
最后调用StreamingQuery.awaitTermination等待查询结束;
流查询结果:
在开启查询之前,需要先在另外一个终端上开启一个NetCat简单服务程序,用于向spark流查询产生数据;在终端输入如下即可:
nc -lk 9999
表示监听本地9999端口,这样在该终端上输入数据,即可传输给spark流查询;我们先输入简单字符串”hello spark”:
hadoop@charles-Aspire-4741:/usr/local/spark$ nc -lk 9999
hello spark
即可在之前开启spark流查询的端口看到如下结果:
scala> query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|hello| 1|
|spark| 1|
+-----+-----+
我们就得到了每个单词以及次数;当我们在NetCat终端再输入一行字符串”hello apache”,得到如下结果:
scala> query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|hello| 1|
|spark| 1|
+-----+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| hello| 2|
|apache| 1|
| spark| 1|
+------+-----+
我们可以看到,每处理一行数据,即输出一个结果;因此当数据源传来源源不断的实时数据时,Structured streaming可以按固定时间间隔读取若干行数据,并执行流查询,输出结果;因为这个示例程序用的是complete模式,因此每次都将结果全部输出。当然也可以设置成append(增量)模式,这样每次输出即为新增的结果行;
编写独立应用
之前是在spark-shell中一行一行的执行代码,现在我们可以把程序写成单独一个scala文件,然后提交给spark执行;首先打开一个终端,输入以下命令:
cd /usr/local/spark/mycode mkdir streaming cd streaming mkdir -p src/main/scala cd src/main/scala vim TestStructuredStreaming.scala
这样就用vim打开一个scala文件,在该文件输入以下内容:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object WordCountStructuredStreaming{def main(args: Array[String]){val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()import spark.implicits._val lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load()val words = lines.as[String].flatMap(_.split(" "))val wordCounts = words.groupBy("value").count()val query = wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()}
}
代码写好之后,退出终端,然后在/usr/lcoal/spark/mycode/streaming目录下创建simple.sbt文件:
cd /usr/local/spark/mycode/streaming vim simple.sbt # 注意,是simple.sbt,不是simple.txt
打开vim编辑器以后,输入以下内容:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
然后就可以执行sbt打包编译了:
cd /usr/local/spark/mycode/streaming /usr/local/sbt/sbt package
打包成功以后,就可以输入以下命令启动这个程序:
cd /usr/local/spark/mycode/streaming /usr/local/spark/bin/spark-submit --class "WordCountStructuredStreaming" ./target/scala-2.11/simple-project_2.11-1.0.jar
执行上输出程序之后,就开启了监听状态,当我们在NetCat终端输入”hello world”之后,spark应用终端即可输出”hello”和”world”单词出现的次数,和spark-shell输出一致;
以上就是Structured Streaming操作网络流所有内容.
Structured Streaming 案例初体验相关推荐
- Flink大数据实时计算系列-案例初体验:HotPages
Flink大数据实时计算系列-案例初体验:HotPages 目录 HotPages代码 输入日志 运行结果 HotPages代码 /*** Copyright (c) 2018-2028 尚硅谷 Al ...
- 2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构
目录 案例一 实时数据ETL架构 准备主题 模拟基站日志数据 实时增量ETL 案例一 实时数据ETL架构 在实际实时流式项目中,无论使用Storm.SparkStreami ...
- 我的Go+语言初体验——【三、spx案例测试_许式伟先生推荐补充(附-视频)】
欢迎大家参与[我的Go+语言初体验]活动: 活动地址:[https://bbs.csdn.net/topics/603464006?utm_source=1594742339] 安装过程博文:[我的G ...
- 我的Go+语言初体验——【三、spx案例测试(附-视频)】
欢迎大家参与[我的Go+语言初体验]活动: 活动地址:[https://bbs.csdn.net/topics/603464006?utm_source=1594742339] 安装过程博文:[我的G ...
- 【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount
1.3 入门案例:WordCount 入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台C ...
- React初体验-Hello React的组件化方式-React入门小案例
文章目录 React初体验 Hello React案例演练 Hello React案例升级 Hello React的组件化 组件化的方式 数据依赖 事件绑定 其他案例练习 电影列表展示 计数器的案例 ...
- Structured Streaming 入门案例之WordCount
1.编写一个流式计算的应用, 不断的接收外部系统的消息 2.对消息中的单词进行词频统计 3.统计全局的结果 步骤 Socket Server 等待 Structured Streaming 程序连接 ...
- 【Spark深入学习 -15】Spark Streaming前奏-Kafka初体验
----本节内容------- 1.Kafka基础概念 1.1 出世背景 1.2 基本原理 1.2.1.前置知识 1.2.2.架构和原理 1.2.3.基本概念 1.2.4.kafka特点 2.Kafk ...
- 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析
目录 事件时间窗口分析 时间概念 event-time 延迟数据处理 延迟数据 Watermarking 水位 官方案例演示 事件 ...
最新文章
- python代码创建数据库_如何使用python ORM创建数据库表?
- 实力坑队友! CTO 写出低级 Bug,致公司 70 GB 数据遭泄露!
- 计算机安装双系统后系统引导修复的方法
- 单片机while用法c语言例子,51单片机-C语言之while(!x)的理解
- 若某文件系统的目录结构如下图所示,假设用户要访问文件 fault.swf ,且当前工作目录为 swshare ,则该文件的全文件名为( ),相对路径和绝对路径分别为( 请在此空作答
- 雪花算法之唯一ID生成器理解
- 格式化输出字符串变量
- Mac,WIN下支撑 IPV6的 sftp客户端
- [python]如何清屏?也就是实现clear?
- SpringCloud大致架构
- post提交参数有Date类型,总是返回400格式错误
- matlab 生成gcode文件,解析gcode文件以提取坐标
- Java 前后端分离部署方式
- 你知道JavaScript的继承有几种写法吗?
- 12 个追地铁的人:照亮生活的一次追逐
- 计算机专业知识3,计算机专业知识试卷3
- Panoramic 控件设计举例
- 利用OPENCV创作梵高艺术风格图片
- MATLAB 冒泡算法
- Day17 静态页面 导航及图片结构与样式