第一个流处理程序sparkStreaming+Java

史上最简单的spark教程
所有代码示例地址:https://github.com/Mydreamandreality/sparkResearch

(提前声明:文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章,写作不易,转载请注明)
(文章参考:Elasticsearch权威指南,Spark快速大数据分析文档,Elasticsearch官方文档,实际项目中的应用场景)
(帮到到您请点点关注,文章持续更新中!)
Git主页 https://github.com/Mydreamandreality

sparkStreming 是什么


start:让我们先了解一下sparkStreaming的基本概念

  • 在很多的应用中,我们需要即时处理收到的数据,比如实时的交易额,成交量,实时安全检测,安全报警,总之就是需要实时处理的数据
  • sparkstreaming就是spark为这些应用而设计的模型
  • sparkstreaming是弹性的,高吞吐,高容错的实时数据流处理框架
  • 它提供给我们一套和批处理非常相似的Api编写流式计算程序
  • sparkstreaming的数据源:
    • kafka
    • elasticsearch
    • habse
    • flume
    • HDFS
    • 等等
  • 也可以通过window,map等高级函数组成的复杂算法处理
  • 最终处理完的数据可以输出到文件系统,或者数据库等

数据处理流程如下图

sparkstreaming内部工作原理

  • 在sparkstreaming的内部,它接收实时输入的数据并且把数据切分为多个batch(一批),然后再处理分批流结果

数据内部流程如下

  • sparkstreaming提供了一个高级抽象叫做Dstream,代表一个连续的数据流
  • DStream可以从数据源输入口创建,也可以基于Dstream进行创建

编写我们的第一个案例

  • 我们这里还是maven的项目哈,首先引入jar包
        <!--stream流处理--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.3</version></dependency>
  • 需求

    • 需求很简单,从本地服务器的8080端口接收一行一行的数据,并且筛选出包含error的行,打印
  • 代码案例

  • [如果兄弟看过我之前写得spark批处理教程.下面代码就很好理解]

    public static void main(String[] args) {//创建两个核心的本地线程,批处理的间隔为1秒SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("sparkStreamIng");JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));//创建一个连接到IP:localhost,PORT:8080的DStreamJavaReceiverInputDStream<String> dStream = javaStreamingContext.socketTextStream("localhost", 8080);JavaDStream<String> errorLine = dStream.filter(new Function<String, Boolean>() {@Overridepublic Boolean call(String v1) throws Exception {return v1.contains("error");}});//打印包含error的行errorLine.print();try {//开始计算javaStreamingContext.start();//等待计算完成javaStreamingContext.awaitTermination();} catch (InterruptedException e) {e.printStackTrace();}}
  • 在代码中有少许的注释方便大家理解,这里再着重的解释一下这个运行的逻辑
  • 首先需要注意的是StreamingContext是所有流处理的主要入口点
  • 看我们的代码我们创建两个核心的本地线程,批处理的间隔为1秒
  • 其次呢我们使用socketTextStream()函数实时从端口接收数据流,在这个离散流(Dstream)中的每一条记录都是一行文本
  • 之后就是我们很熟悉的代码,看过我之前教程的兄弟应该是很容易就看懂了,使用filter()函数过滤包含有error的文本,只不过这里的返回值变成了(JavaDStream)离散流,不是我们批处理中的RDD了
  • 但是这个时候我们还只是设置了计算,并且实际的执行
  • 只有启动才会真正的执行,所以我们使用streamingContext.start()函数启动我们的流处理,然后print()函数就会打印每秒端口生成的文本信息中包含error数据的

运行我们的流处理程序

  • 第一步当然还是打包我们的程序啦

  • 因为我们的程序是从端口接收实时数据
  • 所以我们安装一个给端口发送数据的工具 ncat
  • 执行:yum install nmap-ncat.x86_64(注:我是centos系统64位)
  • 安装好之后显示如下:

  • 然后我们在控制台输入 nc -help 验证是否安装成功

运行

  • 这里运行我们的代码
  • 先进入我们的打包程序jar上传的目录

  • 然后执行 spark-submit命令
  • /usr/local/spark/spark-2.2.3-bin-hadoop2.7/bin/spark-submit --master local[2] --executor-memory 1g --class SparkStreamDemo chapter17-1.0-SNAPSHOT.jar
  • 运行的命令具体含义就不多说了,之前有一章是单独讲这个的,没看的兄弟可以去看一下
  • 运行效果如下图

  • 可以看到我们的流处理程序每秒都在计算一次
  • 然后这个时候我们clone session一个cmd窗口
  • 执行命令: nc -lk 8080
  • 当你输入这个命令之后就会让你向我们指定的8080端口传输内容
  • 我们就先来个包含 error的单行文本
  • 此处我输入的是errortest(忽略第一行,是我之前测试的数据)
  • 然后我们再进入spark程序这个窗体,发现打印了我们输入的errortest这行文本

第一个流处理程序运行正常,完美~

感兴趣的兄弟可以关注,完整代码都在git上,有问题留言交流,持续更新中

史上最简单的spark教程第十七章-快速开发部署第一个sparkStreaming+Java流处理程序相关推荐

  1. 史上最简单的spark教程第十三章-SparkSQL编程Java案例实践(终章)

    Spark-SQL的Java实践案例(五) 本章核心:JDBC 连接外部数据库,sparkSQL优化,故障监测 史上最简单的spark教程 所有代码示例地址:https://github.com/My ...

  2. 史上最简单的spark教程第二十三章-运行第一个机器学习Java和Python代码案例

    [提前声明] 文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章 写作不易,转载请注明,谢谢! 代码案例地址: ?https://github.com/Mydreamandrea ...

  3. 史上最简单的spark系列教程 | 完结

    <史上最简单的spark系列教程>系列: 与其说是教程不如说是改良后的个人学习笔记|| 教程参考自:<spark官方文档>,<spark快速分析>,<图解Sp ...

  4. 史上最简单的 SpringCloud 教程 | 第一篇: 服务的注册与发现(Eureka)

    最新Finchley版本请访问: https://www.fangzhipeng.com/springcloud/2018/08/30/sc-f1-eureka/ 或者 http://blog.csd ...

  5. 史上最简单的SpringCloud教程 | 第十篇: 高可用的服务注册中心

    转自:https://blog.csdn.net/forezp/article/details/81041101 文章 史上最简单的 SpringCloud 教程 | 第一篇: 服务的注册与发现(Eu ...

  6. 史上最简单的SpringCloud教程 | 第六篇: 分布式配置中心(Spring Cloud Config)

    转:https://blog.csdn.net/forezp/article/details/70037291 最新版本: 史上最简单的SpringCloud教程 | 第六篇: 分布式配置中心(Spr ...

  7. 史上最简单的SpringCloud教程 | 第五篇: 路由网关(zuul)

    转:https://blog.csdn.net/forezp/article/details/69939114 最新版本: 史上最简单的SpringCloud教程 | 第五篇: 路由网关(zuul)( ...

  8. 史上最简单的SpringCloud教程 | 第四篇:断路器(Hystrix)

    转:https://blog.csdn.net/forezp/article/details/69934399 最新版本: 史上最简单的SpringCloud教程 | 第四篇:断路器(Hystrix) ...

  9. 史上最简单的SpringCloud教程 | 第三篇: 服务消费者(Feign)

    转:https://blog.csdn.net/forezp/article/details/69808079 最新版本: 史上最简单的SpringCloud教程 | 第三篇: 服务消费者(Feign ...

最新文章

  1. HDOJ2270(How Many Friends Will Be Together With You)
  2. 用GDB调试程序(二)
  3. 在 CentOS 7 中安装并使用自动化工具 Ansible
  4. ASP.NET MVC教程:理解模型、视图和控制器(1)
  5. JS——“==”与“===”
  6. Xshell 连接报错解决:WARNING! The remote SSH server rejected X11 forwarding request.
  7. 矩阵迹的性质_矩阵(含逆)的迹、行列式关于矩阵自身的导数计算与Maple验证...
  8. Aspcms框架的webshell
  9. 【POJ - 2762】Going from u to v or from v to u?(Tarjan缩点,树形dp 或 拓扑排序,欧拉图相关)
  10. u盘启动蓝屏 索尼vaio_U盘重装系统出现蓝屏?不要急,这四个手段轻松帮你解决!...
  11. MSN-LDL论文修改(B-Y Rong20211012)
  12. python里turtle.circle什么意思_Python turtle.circle方法代碼示例
  13. python bmp转jpg_利用python实现.dcm格式图像转为.jpg格式
  14. 测试开发之缺陷报告下篇
  15. ehcache 一二事 - ssm 中ehcashe的简单配置应用
  16. linux安装最新php版本下载地址,服务器配置-使用Linux编译安装PHP指定版本
  17. MongoDB入门+深入(二)--项目实战
  18. html5新年网页做给父母的,把孝心献给父母
  19. 数学建模及数据分析上的插值处理——第三部分实践插值实战
  20. Resistors in Parallel(Gym - 102028E 2018 ICPC 焦作E题 大数+规律C++版)

热门文章

  1. 【LiteOS】小白进阶之常用 LiteOS 任务接口与基础原理详解(一)
  2. 华硕F80cr笔记本装win7旗舰版后杂音解决办法
  3. 如何选择合适的网银支付接口
  4. 有什么项目工时管理工具?
  5. 2006社区风云榜MVB评选活动
  6. hp 服务器可以安装win7系统安装失败,HP笔记本UEFI导致无法安装win7系统的解决方法...
  7. cal命令:日期显示
  8. 黑龙江省七台河市谷歌高清卫星地图下载
  9. java计算机毕业设计智慧物业管理系统源码+数据库+系统+lw文档+mybatis+运行部署
  10. swoole入门教程02-Swoole的Task使用以及swoole_client