文章来自:http://www.cnblogs.com/hark0623/p/4172462.html   转发请注明

object LogicHandle {def main(args: Array[String]) {//添加这个不会报执行错误val path = new File(".").getCanonicalPath()System.getProperties().put("hadoop.home.dir", path);new File("./bin").mkdirs();new File("./bin/winutils.exe").createNewFile();//val sparkConf = new SparkConf().setAppName("SensorRealTime").setMaster("local[2]")val sparkConf = new SparkConf().setAppName("SensorRealTime")val ssc = new StreamingContext(sparkConf, Seconds(20))val hostname = "localhost"val port = 2345val storageLevel = StorageLevel.MEMORY_ONLYval flumeStream = FlumeUtils.createStream(ssc, hostname, port, storageLevel)val lhc = new LogicHandleClass();//日志格式化模板val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");val sdfHour = new SimpleDateFormat("HH");val sdfMinute = new SimpleDateFormat("mm")//存储数据的hash对象  key/value存储  根据文档规则,使用各统计指标的key/valuevar redisMap = new HashMap[String, String]
      flumeStream.foreachRDD(rdd => {val events = rdd.collect()//println("event count:" + events.length)var i = 1for (event <- events) {val sensorInfo = new String(event.event.getBody.array()) //单行记录//单行记录格式化val arrayFileds = sensorInfo.split(",")if (arrayFileds.length == 6) {val shopId = arrayFileds(0) //店内编号
val floorId = shopId.substring(0, 5) //楼层编号val mac = arrayFileds(1)val ts = arrayFileds(2).toLong //时间戳val time = sdf.format(ts * 1000)var hour = sdfHour.format(ts * 1000)var minute = sdfMinute.format(ts * 1000)var allMinute = hour.toInt * 60 + minute.toIntval x = arrayFileds(3)val y = arrayFileds(4)val level = arrayFileds(5)//后边就是我的业务代码了,省略了}}//存储至redis中
      lhc.SetAll(redisMap)})ssc.start()ssc.awaitTermination()}
}

转载于:https://www.cnblogs.com/hark0623/p/4172462.html

分享一下spark streaming与flume集成的scala代码。相关推荐

  1. Spark Streaming和Flume集成指南V1.4.1

    Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务.这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据.这里有两个方法. ...

  2. Spark Streaming整合flume实战

    Spark Streaming对接Flume有两种方式 Poll:Spark Streaming从flume 中拉取数据 Push:Flume将消息Push推给Spark Streaming 1.安装 ...

  3. spark kafka java api_java实现spark streaming与kafka集成进行流式计算

    java实现spark streaming与kafka集成进行流式计算 2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧, ...

  4. DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36

    前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...

  5. Spark Streaming笔记整理(二):案例、SSC、数据源与自定义Receiver

    [TOC] 实时WordCount案例 主要是监听网络端口中的数据,并实时进行wc的计算. Java版 测试代码如下: package cn.xpleaf.bigdata.spark.java.str ...

  6. 基于 ELK Stack 和 Spark Streaming 的日志处理平台设计与实现

    概述 大数据时代,随着数据量不断增长,存储与计算集群的规模也逐渐扩大,几百上千台的云计算环境已不鲜见.现在的集群所需要解决的问题不仅仅是高性能.高可靠性.高可扩展性,还需要面对易维护性以及数据平台内部 ...

  7. Spark Streaming实时数据分析

    1.Spark Streaming功能介绍 1)定义 Spark Streaming is an extension of the core Spark API that enables scalab ...

  8. Spark Streaming简介 (三十四)

    Spark Streaming简介 Spark Streaming 是 Spark 提供的对实时数据进行流式计算的组件.它是 Spark 核心 API 的一个扩展,具有吞吐量高.容错能力强的实时流数据 ...

  9. Spark Streaming 流式计算实战

    这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容. 业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名 ...

最新文章

  1. 2016已经过去,2017即将开始
  2. C++对象的内存分析(5)
  3. 网站不允许上传asp cer cdx htr等文件时
  4. 1805b: Coronavirus Spike Protein Binder Design 寻找蛋白质阻止新冠病毒感染人类细胞
  5. qt在加入Q_OBJECT宏之后出现编译错误
  6. Matlab--Figure界面工具栏使用简要说明
  7. ubuntu下配置php环境
  8. Deep Convolutional Network Cascade for Facial Point Detection论文算法解析
  9. mysql之前缀索引
  10. java大于0的正则_求一个 大于0且小于1 的正则表达式(无论几位小数)
  11. element-ui 删除input框尾部默认图标和获取焦点边框高亮问题
  12. JS严格模式(use strict)
  13. 【Linux 0.11】第九章 块设备驱动程序
  14. 【深度】新派LaaS协议Elephant:重振DeFi赛道发展的关键
  15. 解决catkin_make时出现make[2]: *** No rule to make target ‘/usr/lib/libOpenNI2.so‘, needed by ‘*******‘。
  16. flutter与RN对比
  17. Python 实现远程监控中心
  18. Windows 10 ISO 官方镜像下载
  19. HDU 5761 Rower Bo 物理题(积分求时间)
  20. 自连接、外连接和自连接查询

热门文章

  1. Android 添加 *.arr
  2. Trie树实现[ java ]
  3. HADOOP2.5.0_64安装日志
  4. android 将byte[]保存到手机
  5. 02-java常量变量数据类型
  6. Context结构图
  7. 从fragment开始向上查找实现了某个接口的parent Fragment 或者 Activity
  8. Android 自定义Adapter以实现自定义填充ListView的Item
  9. Fastify 2.2.0 和 1.14.5 发布,极速 Node.js Web 框架
  10. java.lang包—对象基类Object