一 概览

Structured Streaming是一种可伸缩的、容错的、基于Spark SQL引擎的流式计算引擎。我们可以使用与针对静态数据的批处理计算操作一样的方式来编写流式计算操作。随着数据不断地到达,Spark SQL引擎会以一种增量的方式来执行这些操作,并且持续更新结算结果。可以使用java、scala、python等编程语言,以及dataset/dataframe api来编写计算操作,执行数据流的聚合、基于event的滑动窗口、流式数据与离线数据的join等操作。所有这些操作都与Spark SQL使用一套引擎来执行。此外,structured streaming会通过checkpoint和预写日志等机制来实现一次且仅一次的语义。简单来说,对于开发人员来说,根本不用去考虑是流式计算,还是批处理,只要使用同样的方式来编写计算操作即可,structured streaming在底层会自动去实现快速、可伸缩、容错、一次且仅一次语义。

二 入门案例

1、创建SparkSession

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;import java.util.Arrays;
import java.util.Iterator;SparkSession spark = SparkSession.builder().appName("JavaStructuredNetworkWordCount").getOrCreate();

2、创建DataFrame用于表示从本地9999端口监听到的数据,然后对DataFrame进行转换,计算单词数。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 9999).load();// Split the lines into words
Dataset<String> words = lines.as(Encoders.STRING()).flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

lines这个DataFrame表示一个包含流文本数据的无界表。 此表包含一列名为“value”的字符串,并且流文本数据中的每一行都将成为表中的一行。 需要注意的是,这并不是正在收到任何数据,因为我们只是设置转换,还没有开始。 接下来,我们使用.as [Encoders.STRING()]将DataFrame转换为String的Dataset,以便我们可以应用flatMap操作将每一行分割成多个单词。 所得词汇Dataset包含所有单词。 最后,我们已经通过将数据集中唯一的值进行分组并对它们进行计数来定义wordCounts DataFrame。 请注意,这是一个streaming DataFrame,它表示运行的stream的word counts。 
我们现在已经设置了关于streaming data的查询。 剩下的就是实际开始接收数据并计算数量。 为此,我们将其设置为在每次更新时将完整的计数集合(由outputMode(“complete”)指定)打印到控制台。 然后使用start()启动流计算。

// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();query.awaitTermination();

执行此代码后,流式计算将在后台启动。 查询对象是该活动流查询的句柄,我们已经决定使用query.awaitTermination()等待查询的终止,以防止查询处于活动状态时退出进程。 
现在我们编译Spark应用程序代码,运行Netcat作为数据服务器。

$ nc -lk 9999

然后,我们在netcat服务器的终端中输入的任何行将每秒计数并打印在屏幕上。它将看起来像下面这样:

$ nc -lk 9999
apache spark
apache hadoop
# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

Structured Streaming-快速入门相关推荐

  1. Structured Streaming基础入门

    Structured Streaming 1. 回顾和展望 1.1. Spark 编程模型的进化过程 RDD rdd.flatMap(_.split(" ")).map((_, 1 ...

  2. Structured Streaming《入门示例》

    概述: Structured Streaming 是一个构建在Spark SQL 引擎上,可扩展,容错的的流处理引擎.您可以像编写静态数据的批处理程序一样,编写流处理程序.Spark SQL 引擎会增 ...

  3. Structured Streaming 开发入门

    Structured Streaming 作为 Spark 家族的新成员,通过 Spark SQL/DataFrame 来处理 Batch/Streaming 数据,基本的 SparkSQL API ...

  4. 【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount

    1.3 入门案例:WordCount 入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台C ...

  5. Structured Streaming 入门案例之WordCount

    1.编写一个流式计算的应用, 不断的接收外部系统的消息 2.对消息中的单词进行词频统计 3.统计全局的结果 步骤 Socket Server 等待 Structured Streaming 程序连接 ...

  6. [学习笔记]黑马程序员Spark全套视频教程,4天spark3.2快速入门到精通,基于Python语言的spark教程

    文章目录 视频资料: 思维导图 一.Spark基础入门(环境搭建.入门概念) 第二章:Spark环境搭建-Local 2.1 课程服务器环境 2.2 Local模式基本原理 2.3 安装包下载 2.4 ...

  7. Structured Streaming编程 Programming Guide

    Structured Streaming编程 Programming Guide • Overview • Quick Example • Programming Model o Basic Conc ...

  8. 2021年大数据Spark(四十五):Structured Streaming Sources 输入源

    目录 Sources 输入源 Socket数据源-入门案例 需求 编程实现 ​​​​​​​文件数据源-了解 ​​​​​​​需求 ​​​​​​​代码实现 ​​​​​​​Rate source-了解 So ...

  9. Apache Hive 快速入门 (CentOS 7.3 + Hadoop-2.8 + Hive-2.1.1)

    2019独角兽企业重金招聘Python工程师标准>>> 本文节选自<Netkiller Database 手札> 第 63 章 Apache Hive 目录 63.1. ...

最新文章

  1. 智能POS常见问题整理
  2. Vue创建组件的三种方式
  3. postman 抓包工具charles的使用
  4. Java---利用程序实现在控制台聊天
  5. css 3小时从入门到略通
  6. mysql metadata lock(二)
  7. 用友U8对账不平,对账错误简单处理方法
  8. .NET(WinCE、WM)开发转Android开发 ——Xamarin和Smobiler对比...
  9. gbadev上的资料搬运贴
  10. 计算机应用项目教案,计算机应用基础2项目二--电子教案.doc
  11. jmp怎么做合并的箱线图_基于JMP 15的箱线图(Box Plot)的着色
  12. GRE红宝书使用方法介绍
  13. 了解数据分析师,转行数据分析师,成为数据分析师
  14. python urllib 函数_python的urllib.quote()和urllib.unquote()的等效javascript函数
  15. Linux挂载OneDrive
  16. java文件恢复软件,误删文件恢复-误删文件恢复大师 v1.1.0 最新版
  17. 安卓手机优化,修改build.prop
  18. 数据报表常用excel公式
  19. 【obs-studio开源项目从入门到放弃】obs-studio项目简介和架构
  20. 互联网圈子,黄进磊竟然把它解释的如此霸气!

热门文章

  1. 零基础小白如何学会Java?
  2. [置顶] 多任务和多线程(2)
  3. 孩子的乐高课程,纠结啊
  4. Polarr Photo教程:通过后期增强其阳光表现力
  5. 理解Java的OCP和IOC原理
  6. 高德地图——驾驶路线
  7. 基于词典的中文情感倾向分析算法设计
  8. 专家称楼市细分市场出现机会 “以价换量”成趋势
  9. Baumer工业相机堡盟工业相机如何使用BGAPI SDK解决两个万兆网相机的同步采集不同步的问题
  10. java如何遍历combobox_获取ComboBox的SelectedItem(MVVM)