需求

现在想要通过socket发送数据, 然后Spark Streaming接收数据并且统计socket发送的每个单词的个数.

1. 架构图

2. 实现流程

  1. 安装并启动生产者
    首先在linux服务器上用yum安装netcat工具,netcat工具用了发送socket套接字,netcat的命令是nc, 它使用来设置路由器的,我们可以利用它向某个端口发送数据.
    linux安装netcat命令如下:
yum intstall -y nc
  1. 通过netcat工具向指定的端口发送数据
nc -lk 9999
  1. IDEA编写Spark Streaming代码
package cn.acec.sparkStreamingtestimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/*** sparkStreming流式处理接受socket数据,实现单词统计*/
object SparkStreamingNC{def main(args: Array[String]): Unit = {//配置sparkConf参数val sparkConf: SparkConf = new   SparkConf().setAppName("SparkStreamingTCP").setMaster("local[2]")//构建sparkContext对象val sc: SparkContext = new SparkContext(sparkConf)//设置日志输出级别sc.setLogLevel("WARN")//构建StreamingContext对象,每个批处理的时间间隔val scc: StreamingContext = new StreamingContext(sc,Seconds(5))//注册一个监听的IP地址和端口  用来收集数据val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.200.160",9999)//切分每一行记录val words: DStream[String] = lines.flatMap(_.split(" "))//每个单词记为1val wordAndOne: DStream[(String, Int)] = words.map((_,1))//分组聚合val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)//打印数据result.print()scc.start()scc.awaitTermination()}
}

注意:
由于使用的是本地模式local[2], 所以可以直接在本地运行程序
要指定并行度, 如在本地运行设置setMaster(“local[2]”),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1

3. 执行查看结果

  1. 先执行nc -lk 9999
  2. 然后再在IDEA中启动代码
  3. 不断的在1.中输入不同的单词,观察IDEA控制台输出

4. 结果

现象:sparkStreaming每隔5s计算一次当前5s内的数据,然后将每个批次的数据输出。

DStream实战之Spark Streaming接收socket数据实现WordCount 31相关推荐

  1. DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36

    前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...

  2. spark streaming 接收 kafka 数据java代码WordCount示例

    1. 首先启动zookeeper 2. 启动kafka 3. 核心代码 生产者生产消息的java代码,生成要统计的单词 package streaming;import java.util.Prope ...

  3. sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据

    Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现. 1.Spark Streami ...

  4. Spark Streaming揭秘 Day16 数据清理机制

    Spark Streaming揭秘 Day16 数据清理机制 今天主要来讲下Spark的数据清理机制,我们都知道,Spark是运行在jvm上的,虽然jvm本身就有对象的自动回收工作,但是,如果自己不进 ...

  5. Spark Streaming通过Socket检测空气质量

    作业描述: 针对当前空⽓质量监测数据,环保部门需要根据监测数据实时发布预警信息,需要我们在实时到达的六种污染物监测数据中,根据每⼀种数据的监测值进⾏报警检测. 输⼊数据:"空⽓质量监控数据& ...

  6. Spark Streaming处理Socket流简单实例

    在本文中我将在IDEA工具中开发一个SparkStream程序用于监听本机9999端口所接收的数据 首先,我们将Spark Streaming类的名称以及从StreamingContext进行的一些隐 ...

  7. Spark Streaming读取Kafka数据的两种方式

    Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...

  8. Spark大数据分析与实战:Spark Streaming编程初级实践

    Spark Streaming编程初级实践 一.安装Hadoop和Spark 具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作: Hadoop的安装:https://blog.csdn ...

  9. Flume+Kafka+Spark Streaming实现大数据实时流式数据采集

    近年来,随着企业信息化建设的飞速发展,大数据应用的问题越来越备受关注.很多企业投入大量的人力.物力和财力建设企业大数据平台,平台建设工作涵盖数据采集.数据处理.数据存储.数据服务.数据展示以及数据质量 ...

最新文章

  1. apache服务Forbidden 403问题精彩总结
  2. java.lang.UnsupportedClassVersionError: Bad version number in .class file
  3. javarxtx串口接收时数据会分成好几段_【STM32】串口通信基本原理(超基础、详细版)...
  4. [源码和文档分享]基于C语言的物流配送管理信息系统
  5. synchronized锁升级_synchronized详解以及锁的膨胀升级过程
  6. javascript闭包_JavaScript闭包基本指南
  7. 饿了么上架iPhone 12:最快花呗10分钟拿到手
  8. CoreData 从入门到精通(三)关联表的创建
  9. JS中将变量转为字符串
  10. matlab画空间直线,空间直线x y=z怎么画
  11. 计算机显卡驱动不匹配,显卡驱动与系统不兼容?尤其是老电脑
  12. 【雨滴桌面】简洁桌面天气皮肤YcWea5.5,直接通过HTML文档获取天气数据,鼠标移入显示近三天天气,鼠标移出隐藏
  13. Linux权限中x是什么意思,linux里的drwxr-xr-x代表的意思
  14. 转:听听别人怎么说:VueJS 与 ReactJS
  15. LeetCode-1276. 不浪费原料的汉堡制作方案
  16. pdf合并的工具下载
  17. Mozi僵尸网络(P2P僵尸网络Mozi)
  18. biblatex中参考文献期刊名缩写的实现
  19. npm本地仓库搭建教程
  20. kubernetes存储:local,openEBS,rook ceph

热门文章

  1. SKIL/工作流程/Java部署客户端
  2. 云签约,云培训,云办公,云指挥... 欧冶工业品,用数字化赋能企业高效协作
  3. SUN CEO 斯考特·麦克里尼的印象
  4. HDU 3328 Flipper 魔术纸牌
  5. shp格式全国基础数据 公路 铁路 水系 国界 省界等
  6. 更新pip的时候遇到问题ERROR: Exception: Traceback (most recent call last): File c:\users\dell\anac
  7. TYPE-C接口简介
  8. sql文字转换全拼_获取汉字全拼SQL函数
  9. Oracle数据库学习之事务,去重,空值处理,基本操作符(五)
  10. 【领英如何批量撤回未通过邀请,释放占用名额恢复加人】