史上最简单的spark教程第十七章-快速开发部署第一个sparkStreaming+Java流处理程序
第一个流处理程序sparkStreaming+Java
史上最简单的spark教程
所有代码示例地址:https://github.com/Mydreamandreality/sparkResearch
(提前声明:文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章,写作不易,转载请注明)
(文章参考:Elasticsearch权威指南,Spark快速大数据分析文档,Elasticsearch官方文档,实际项目中的应用场景)
(帮到到您请点点关注,文章持续更新中!)
Git主页 https://github.com/Mydreamandreality
sparkStreming 是什么
start:让我们先了解一下sparkStreaming的基本概念
- 在很多的应用中,我们需要即时处理收到的数据,比如实时的交易额,成交量,实时安全检测,安全报警,总之就是需要实时处理的数据
- sparkstreaming就是spark为这些应用而设计的模型
- sparkstreaming是弹性的,高吞吐,高容错的实时数据流处理框架
- 它提供给我们一套和批处理非常相似的Api编写流式计算程序
- sparkstreaming的数据源:
- kafka
- elasticsearch
- habse
- flume
- HDFS
- 等等
- 也可以通过window,map等高级函数组成的复杂算法处理
- 最终处理完的数据可以输出到文件系统,或者数据库等
数据处理流程如下图
sparkstreaming内部工作原理
- 在sparkstreaming的内部,它接收实时输入的数据并且把数据切分为多个batch(一批),然后再处理分批流结果
数据内部流程如下
- sparkstreaming提供了一个高级抽象叫做Dstream,代表一个连续的数据流
- DStream可以从数据源输入口创建,也可以基于Dstream进行创建
编写我们的第一个案例
- 我们这里还是maven的项目哈,首先引入jar包
<!--stream流处理--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.3</version></dependency>
需求
- 需求很简单,从本地服务器的8080端口接收一行一行的数据,并且筛选出包含error的行,打印
代码案例
[如果兄弟看过我之前写得spark批处理教程.下面代码就很好理解]
public static void main(String[] args) {//创建两个核心的本地线程,批处理的间隔为1秒SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("sparkStreamIng");JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));//创建一个连接到IP:localhost,PORT:8080的DStreamJavaReceiverInputDStream<String> dStream = javaStreamingContext.socketTextStream("localhost", 8080);JavaDStream<String> errorLine = dStream.filter(new Function<String, Boolean>() {@Overridepublic Boolean call(String v1) throws Exception {return v1.contains("error");}});//打印包含error的行errorLine.print();try {//开始计算javaStreamingContext.start();//等待计算完成javaStreamingContext.awaitTermination();} catch (InterruptedException e) {e.printStackTrace();}}
- 在代码中有少许的注释方便大家理解,这里再着重的解释一下这个运行的逻辑
- 首先需要注意的是StreamingContext是所有流处理的主要入口点
- 看我们的代码我们创建两个核心的本地线程,批处理的间隔为1秒
- 其次呢我们使用socketTextStream()函数实时从端口接收数据流,在这个离散流(Dstream)中的每一条记录都是一行文本
- 之后就是我们很熟悉的代码,看过我之前教程的兄弟应该是很容易就看懂了,使用filter()函数过滤包含有error的文本,只不过这里的返回值变成了(JavaDStream)离散流,不是我们批处理中的RDD了
- 但是这个时候我们还只是设置了计算,并且实际的执行
- 只有启动才会真正的执行,所以我们使用streamingContext.start()函数启动我们的流处理,然后print()函数就会打印每秒端口生成的文本信息中包含error数据的
运行我们的流处理程序
- 第一步当然还是打包我们的程序啦
- 因为我们的程序是从端口接收实时数据
- 所以我们安装一个给端口发送数据的工具 ncat
- 执行:yum install nmap-ncat.x86_64(注:我是centos系统64位)
- 安装好之后显示如下:
- 然后我们在控制台输入
nc -help
验证是否安装成功
运行
- 这里运行我们的代码
- 先进入我们的打包程序jar上传的目录
- 然后执行 spark-submit命令
/usr/local/spark/spark-2.2.3-bin-hadoop2.7/bin/spark-submit --master local[2] --executor-memory 1g --class SparkStreamDemo chapter17-1.0-SNAPSHOT.jar
- 运行的命令具体含义就不多说了,之前有一章是单独讲这个的,没看的兄弟可以去看一下
- 运行效果如下图
- 可以看到我们的流处理程序每秒都在计算一次
- 然后这个时候我们clone session一个cmd窗口
- 执行命令:
nc -lk 8080
- 当你输入这个命令之后就会让你向我们指定的8080端口传输内容
- 我们就先来个包含 error的单行文本
- 此处我输入的是errortest(忽略第一行,是我之前测试的数据)
- 然后我们再进入spark程序这个窗体,发现打印了我们输入的errortest这行文本
第一个流处理程序运行正常,完美~
感兴趣的兄弟可以关注,完整代码都在git上,有问题留言交流,持续更新中
史上最简单的spark教程第十七章-快速开发部署第一个sparkStreaming+Java流处理程序相关推荐
- 史上最简单的spark教程第十三章-SparkSQL编程Java案例实践(终章)
Spark-SQL的Java实践案例(五) 本章核心:JDBC 连接外部数据库,sparkSQL优化,故障监测 史上最简单的spark教程 所有代码示例地址:https://github.com/My ...
- 史上最简单的spark教程第二十三章-运行第一个机器学习Java和Python代码案例
[提前声明] 文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章 写作不易,转载请注明,谢谢! 代码案例地址: ?https://github.com/Mydreamandrea ...
- 史上最简单的spark系列教程 | 完结
<史上最简单的spark系列教程>系列: 与其说是教程不如说是改良后的个人学习笔记|| 教程参考自:<spark官方文档>,<spark快速分析>,<图解Sp ...
- 史上最简单的 SpringCloud 教程 | 第一篇: 服务的注册与发现(Eureka)
最新Finchley版本请访问: https://www.fangzhipeng.com/springcloud/2018/08/30/sc-f1-eureka/ 或者 http://blog.csd ...
- 史上最简单的SpringCloud教程 | 第十篇: 高可用的服务注册中心
转自:https://blog.csdn.net/forezp/article/details/81041101 文章 史上最简单的 SpringCloud 教程 | 第一篇: 服务的注册与发现(Eu ...
- 史上最简单的SpringCloud教程 | 第六篇: 分布式配置中心(Spring Cloud Config)
转:https://blog.csdn.net/forezp/article/details/70037291 最新版本: 史上最简单的SpringCloud教程 | 第六篇: 分布式配置中心(Spr ...
- 史上最简单的SpringCloud教程 | 第五篇: 路由网关(zuul)
转:https://blog.csdn.net/forezp/article/details/69939114 最新版本: 史上最简单的SpringCloud教程 | 第五篇: 路由网关(zuul)( ...
- 史上最简单的SpringCloud教程 | 第四篇:断路器(Hystrix)
转:https://blog.csdn.net/forezp/article/details/69934399 最新版本: 史上最简单的SpringCloud教程 | 第四篇:断路器(Hystrix) ...
- 史上最简单的SpringCloud教程 | 第三篇: 服务消费者(Feign)
转:https://blog.csdn.net/forezp/article/details/69808079 最新版本: 史上最简单的SpringCloud教程 | 第三篇: 服务消费者(Feign ...
最新文章
- HDOJ2270(How Many Friends Will Be Together With You)
- 用GDB调试程序(二)
- 在 CentOS 7 中安装并使用自动化工具 Ansible
- ASP.NET MVC教程:理解模型、视图和控制器(1)
- JS——“==”与“===”
- Xshell 连接报错解决:WARNING! The remote SSH server rejected X11 forwarding request.
- 矩阵迹的性质_矩阵(含逆)的迹、行列式关于矩阵自身的导数计算与Maple验证...
- Aspcms框架的webshell
- 【POJ - 2762】Going from u to v or from v to u?(Tarjan缩点,树形dp 或 拓扑排序,欧拉图相关)
- u盘启动蓝屏 索尼vaio_U盘重装系统出现蓝屏?不要急,这四个手段轻松帮你解决!...
- MSN-LDL论文修改(B-Y Rong20211012)
- python里turtle.circle什么意思_Python turtle.circle方法代碼示例
- python bmp转jpg_利用python实现.dcm格式图像转为.jpg格式
- 测试开发之缺陷报告下篇
- ehcache 一二事 - ssm 中ehcashe的简单配置应用
- linux安装最新php版本下载地址,服务器配置-使用Linux编译安装PHP指定版本
- MongoDB入门+深入(二)--项目实战
- html5新年网页做给父母的,把孝心献给父母
- 数学建模及数据分析上的插值处理——第三部分实践插值实战
- Resistors in Parallel(Gym - 102028E 2018 ICPC 焦作E题 大数+规律C++版)
热门文章
- 【LiteOS】小白进阶之常用 LiteOS 任务接口与基础原理详解(一)
- 华硕F80cr笔记本装win7旗舰版后杂音解决办法
- 如何选择合适的网银支付接口
- 有什么项目工时管理工具?
- 2006社区风云榜MVB评选活动
- hp 服务器可以安装win7系统安装失败,HP笔记本UEFI导致无法安装win7系统的解决方法...
- cal命令:日期显示
- 黑龙江省七台河市谷歌高清卫星地图下载
- java计算机毕业设计智慧物业管理系统源码+数据库+系统+lw文档+mybatis+运行部署
- swoole入门教程02-Swoole的Task使用以及swoole_client