Structured Streaming程序基本步骤

编写Structured Streaming程序的基本步骤是:
1.创建SparkSession实例;
2.创建DataFrame表示从数据源输入的每一行数据;
3.DataFrame转换,类似于RDD转换操作;
4.创建StreamingQuery开启流查询;
5.调用StreamingQuery.awaitTermination()方法,等待流查询结束。

Structured Streaming操作示例

这里还是用统计单词个数的示例,来展示如何进行Structured Streaming操作;

获取SparkSession

要操作Structured Streaming,首先要获取SparkSession实例,我们可以如下创建一个SparkSession;登录Linux系统,启动spark-shell。然后输入一下代码:

scala> import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.SparkSession
val spark = SparkSession.| builder.| appName("StructuredNetworkWordCount").| getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d1bf7bfscala> import spark.implicits._

首先引入两个包,接着实例化一个SparkSession对象;appName方法设置spark应用名称,getOrCreate方法表示如果已存在一个SparkSession,则直接用现成的SparkSession,否则创建一个SparkSession,其实spark就是已经存在的SparkSession,因为每次开启spark-shell时,就默认开启了一个SparkSession;千万别忘了引入spark.implicits._包,否则会报错。

创建DataFrame

创建好SparkSession后,即可用SparkSession.readStream方法创建DataFrame,在spark-shell输入一下代码:

scala> val lines = spark.readStream.| format("socket").| option("host","localhost").| option("port",9999).| load()
lines: org.apache.spark.sql.DataFrame = [value: string]

SparkSession.readStream方法返回一个DataStreamReader实例,接着DataStreamReader.format设置数据源为网络套接字,这里还可以设置从文件获取数据;然后调用两次DataStreamReader.option分别设置套接字的主机和端口;最后调用DataStreamReader.load方法,返回一个DataFrame实例;

此时,lines可以理解为之前Spark2.0入门:Structured Streaming简介提到的”unbound table”,实时到来的数据,即添加到这个”unbound table”;由输出可以看出,这个DataFrame每行包含一个字符串;

DataFrame转换

获取lines这个DataFrame之后,接下来就要处理这个DataFrame;类似于RDD转换操作,DataFrame也是通过转换成新的DataFrame来处理数据;因此由之前lines获取到每行数据之后,接下来要进行分隔并进行统计;

scala> val words = lines.as[String].flatMap(_.split(" "))
words: org.apache.spark.sql.Dataset[String] = [value: string]scala> val wordCounts = words.groupBy("value").count()
wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]

lines.as[String]将DataFrame转换成DataSet,其实DataFrame只是DataSet的特例,在官方的API文档上有如下声明

type DataFrame = Dataset[Row]

接着调用flatMap将一行实时数据按空格切割成单词集合;groupBy方法表示按”value”这个属性合并,count方法表示统计出每个单词出现的次数;此时的wordCounts又转换为DataFrame,且这个DataFrame有两个属性,value和count,即每次处理完一行实时数据时,都会输出单词和该单词出现的次数;

执行流查询

如果DataFrame转换操作定义结束,接下来即可开启流查询,在spark-shell输入如下:

scala> val query = wordCounts.writeStream.| outputMode("complete").| format("console").| start()
query: org.apache.spark.sql.streaming.StreamingQuery = Streaming Query - query-0 [state = ACTIVE]scala> query.awaitTermination()

wordCounts.writeStream返回一个 DataStreamWriter实例,该实例定义了将实时流查询产生结果输出到外部存储的接口;outputMode设置了’complete’模式,即每次都输出全部结果数据;format定义输出媒介,这里为控制台;最后调用start方法开启流查询并返回一个StreamingQuery实例;

最后调用StreamingQuery.awaitTermination等待查询结束;

流查询结果:

在开启查询之前,需要先在另外一个终端上开启一个NetCat简单服务程序,用于向spark流查询产生数据;在终端输入如下即可:

nc -lk 9999

表示监听本地9999端口,这样在该终端上输入数据,即可传输给spark流查询;我们先输入简单字符串”hello spark”:

hadoop@charles-Aspire-4741:/usr/local/spark$ nc -lk 9999
hello spark

即可在之前开启spark流查询的端口看到如下结果:

scala> query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|hello|    1|
|spark|    1|
+-----+-----+

我们就得到了每个单词以及次数;当我们在NetCat终端再输入一行字符串”hello apache”,得到如下结果:

scala> query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|hello|    1|
|spark|    1|
+-----+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| hello|    2|
|apache|    1|
| spark|    1|
+------+-----+

我们可以看到,每处理一行数据,即输出一个结果;因此当数据源传来源源不断的实时数据时,Structured streaming可以按固定时间间隔读取若干行数据,并执行流查询,输出结果;因为这个示例程序用的是complete模式,因此每次都将结果全部输出。当然也可以设置成append(增量)模式,这样每次输出即为新增的结果行;

编写独立应用

之前是在spark-shell中一行一行的执行代码,现在我们可以把程序写成单独一个scala文件,然后提交给spark执行;首先打开一个终端,输入以下命令:

cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir -p src/main/scala
cd src/main/scala
vim TestStructuredStreaming.scala

这样就用vim打开一个scala文件,在该文件输入以下内容:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object WordCountStructuredStreaming{def main(args: Array[String]){val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()import spark.implicits._val lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load()val words = lines.as[String].flatMap(_.split(" "))val wordCounts = words.groupBy("value").count()val query = wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()}
}

代码写好之后,退出终端,然后在/usr/lcoal/spark/mycode/streaming目录下创建simple.sbt文件:

cd /usr/local/spark/mycode/streaming
vim simple.sbt # 注意,是simple.sbt,不是simple.txt

打开vim编辑器以后,输入以下内容:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"

然后就可以执行sbt打包编译了:

cd /usr/local/spark/mycode/streaming
/usr/local/sbt/sbt package

打包成功以后,就可以输入以下命令启动这个程序:

cd /usr/local/spark/mycode/streaming
/usr/local/spark/bin/spark-submit --class "WordCountStructuredStreaming" ./target/scala-2.11/simple-project_2.11-1.0.jar

执行上输出程序之后,就开启了监听状态,当我们在NetCat终端输入”hello world”之后,spark应用终端即可输出”hello”和”world”单词出现的次数,和spark-shell输出一致;

以上就是Structured Streaming操作网络流所有内容.

Structured Streaming 案例初体验相关推荐

  1. Flink大数据实时计算系列-案例初体验:HotPages

    Flink大数据实时计算系列-案例初体验:HotPages 目录 HotPages代码 输入日志 运行结果 HotPages代码 /*** Copyright (c) 2018-2028 尚硅谷 Al ...

  2. 2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

    目录 案例一 实时数据ETL架构 准备主题 ​​​​​​​模拟基站日志数据 ​​​​​​​实时增量ETL 案例一 实时数据ETL架构 在实际实时流式项目中,无论使用Storm.SparkStreami ...

  3. 我的Go+语言初体验——【三、spx案例测试_许式伟先生推荐补充(附-视频)】

    欢迎大家参与[我的Go+语言初体验]活动: 活动地址:[https://bbs.csdn.net/topics/603464006?utm_source=1594742339] 安装过程博文:[我的G ...

  4. 我的Go+语言初体验——【三、spx案例测试(附-视频)】

    欢迎大家参与[我的Go+语言初体验]活动: 活动地址:[https://bbs.csdn.net/topics/603464006?utm_source=1594742339] 安装过程博文:[我的G ...

  5. 【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount

    1.3 入门案例:WordCount 入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台C ...

  6. React初体验-Hello React的组件化方式-React入门小案例

    文章目录 React初体验 Hello React案例演练 Hello React案例升级 Hello React的组件化 组件化的方式 数据依赖 事件绑定 其他案例练习 电影列表展示 计数器的案例 ...

  7. Structured Streaming 入门案例之WordCount

    1.编写一个流式计算的应用, 不断的接收外部系统的消息 2.对消息中的单词进行词频统计 3.统计全局的结果 步骤 Socket Server 等待 Structured Streaming 程序连接 ...

  8. 【Spark深入学习 -15】Spark Streaming前奏-Kafka初体验

    ----本节内容------- 1.Kafka基础概念 1.1 出世背景 1.2 基本原理 1.2.1.前置知识 1.2.2.架构和原理 1.2.3.基本概念 1.2.4.kafka特点 2.Kafk ...

  9. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

最新文章

  1. python代码创建数据库_如何使用python ORM创建数据库表?
  2. 实力坑队友! CTO 写出低级 Bug,致公司 70 GB 数据遭泄露!
  3. 计算机安装双系统后系统引导修复的方法
  4. 单片机while用法c语言例子,51单片机-C语言之while(!x)的理解
  5. 若某文件系统的目录结构如下图所示,假设用户要访问文件 fault.swf ,且当前工作目录为 swshare ,则该文件的全文件名为( ),相对路径和绝对路径分别为( 请在此空作答
  6. 雪花算法之唯一ID生成器理解
  7. 格式化输出字符串变量
  8. Mac,WIN下支撑 IPV6的 sftp客户端
  9. [python]如何清屏?也就是实现clear?
  10. SpringCloud大致架构
  11. post提交参数有Date类型,总是返回400格式错误
  12. matlab 生成gcode文件,解析gcode文件以提取坐标
  13. Java 前后端分离部署方式
  14. 你知道JavaScript的继承有几种写法吗?
  15. 12 个追地铁的人:照亮生活的一次追逐
  16. 计算机专业知识3,计算机专业知识试卷3
  17. Panoramic 控件设计举例
  18. 利用OPENCV创作梵高艺术风格图片
  19. MATLAB 冒泡算法
  20. Day17 静态页面 导航及图片结构与样式

热门文章

  1. 【新书推荐】【2019.03】赤川次郎作品系列(套装共五册)
  2. .equ .long表示什么意思?
  3. 下载软件时的amd64、x86、x86-64是什么,该怎么选?
  4. Camera Tuning职业介绍
  5. 创作并推广机器人课程,给孩子带来快乐与成长
  6. 【计算几何】判断线段相交(跨立实验)
  7. matlab怎么做参数估计,[转载]参数估计(matlab)
  8. 聊聊Mybatis里面的缓存机制吧
  9. 论文阅读:Directed Greybox Fuzzing
  10. 输入年份和月份,打印出这个月有多少天