中国移动实时数据分析-基于spark+kafka+flume
这两天主要是做了中国移动的实时数据分析一个小项目(可以说是demo了),这里记录下来整个过程里面遇到的坑,首先安装好flume,kafka,spark(基于代码本地运行可以不安装),redis,zookeeper 主要是为了熟悉一下整个的一个spark-streaming的一个整个流程,还有就是了解调优的地方。
上述假设已经安装好了相应的组件,然后就开始正式的踩坑之路:
1.编写一个java程序去读取原始数据文件,模拟1s进行文件的插入一行,原始的数据文件格式如下:
坑a
.整个的数据格式是json,但是是一整行的。。。。
解决a1:于是就想这去把这样的数据转化为json格式的,就去捣鼓了一下notepad++转json格式的方法:notepad++上面的菜单栏中,插件-> plugins Admin..->search中直接查找就好了,然后找找有个install的按钮点击一下就ok了,然后各种确定,之后notepad++会自动重启,重启之后上面的菜单栏中,插件->就会多出一个JSON Viewer,然后就可以了。但是我操作的时候遇到了notepad++重启之后没有出现JSON Viewer(但是后来又出现了),
解决a2:于是又去找了idea实现json格式的方法:setting->keymap->main enum->code->reformat code 这个功能是将文本格式化,该功能的快捷键默认是ctrl+shift+l,但是这个快捷键组合是有冲突的,所以将其转化为ctrl+shift+s,修改后进行保存,然后创建一个xxx.json的文件,复制一行json数据到该文件中,然后全选,按下ctrl+shift+s即可转化为标准的json文件格式
相应的java实现代码如下:
import java.io.*;import java.util.ArrayList;import java.util.List; public class WriteCMCC { public static void main(String[] args) { List<String> allLines = getCmcc(args[0]); System.out.println(allLines.size()); writeCmcc(allLines, args[1]); } /** * 一次性读取cmcc中的数据 * @return 存放在list中 */ private static List<String> getCmcc(String path) { BufferedReader br = null; List<String> allLines = new ArrayList<String>(); try { br = new BufferedReader(new FileReader(new File(path))); String line = ""; while ((line = br.readLine()) != null) { allLines.add(line); } } catch (Exception e) { e.printStackTrace(); }finally { try { if (br != null) br.close(); } catch (IOException e) { e.printStackTrace(); } } return allLines; } /** * 写入cmcc中的数据,一次写入一个list的数据集 */ private static void writeCmcc(List<String> cmcc, String path) { BufferedWriter bw = null; try { bw = new BufferedWriter(new FileWriter(new File(path))); for(String line : cmcc) { bw.write(line); bw.flush(); Thread.sleep(1000); bw.newLine(); } } catch (Exception e) { e.printStackTrace(); }finally { try { if (bw != null) bw.close(); } catch (IOException e) { e.printStackTrace(); } } }}
代码写好,然后测试完,然后打成jar包,丢到Linux准备运行。
java -jar /home/soft/jar/write_cmcc_5_seconds.jar /home/soft/cmcc.log /home/soft/cmcc/cmcc_write.log
2.flume编写相应的conf去把数据抽取到kafka中(cmcc.conf)
先启动zookeeper,启动kafka并创建topic(cmcc):
zookeeper启动命令:
/home/soft/zookeeper-3.4.6/bin/zkServer.sh start(每个节点都需要启动)
kafka启动命令:
/home/soft/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /home/soft/kafka_2.11-0.10.1.0/config/server.properties &
kafka创建topic:
bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --create --topic cmcc --partitions 6 --replication-factor
kafka查看所有的topic:
bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --list
然后编写conf测试(cmcc.conf):
a1.sources = s1
a1.channels = c1
#这里先不使用该种方式去读取文件,因为该方式flume会出如下的错误
#java.lang.IllegalStateException: File has been modified since being read: /home/soft/cmcc/cmcc_write.log
#原因:出现这个问题的原因是,当我们拷贝一个文件的时候,一些对文件进行了修改
#解决:最好的方法就是,确保大文件完全拷贝后,再让flume来读取,思路是将拷贝中的文件加上一个多余的后缀,flume一开始不会读取文件,当文件拷贝完成后去掉多余的后缀,这个时候flume就会针对新文件进行读取。
#a1.sources.s1.type =spooldir
#a1.sources.s1.spoolDir =/home/soft/cmcc
#a1.sources.s1.fileHeader= true
a1.sources.s1.type=exec
a1.sources.r1.command = tail -F /root/app_weichat_login.log
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = os1:9092,os2:9092,os3:9092
#创建好相应的topic
a1.channels.c1.kafka.topic = cmcc
#这个是自己定义的没啥事情
a1.channels.c1.kafka.consumer.group.id = flume-consumer
#这个一定要设置,否则就是个坑,写入到kafka中的数据会被追加进一些数据,而且还是乱码的
a1.channels.c1.parseAsFlumeEvent = false
#拼接source和channel
a1.sources.s1.channels=c1
flume启动命令:下面的a1就对应着上面的a1(控制台打印信息)
bin/flume-ng agent -n a1 -c conf -f conf/cmcc.conf -Dflume.root.logger=INFO,console
3.spark程序去读取kafka的中的数据并将结果存放至redis中
启动redis:/usr/local/redis/bin/redis-server /usr/local/redis/etc/redis.conf
程序相应的配置:resources -> application.conf
#kafka的相关参数 kafka.topic = "cmcc" kafka.broker.list="os1:9092,os2:9092,os3:9092" kafka.group.id="cmcc" redis.host="xxx.xxx.xxx.xxx" redis.db.index="0"
主程序代码:scala -> BootStarpApp
package appimport java.text.SimpleDateFormatimport com.alibaba.fastjson.JSON import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import utils.{AppParams, Jpools}object BootStarpApp {def main(args: Array[String]): Unit = {/*** 错误集:* 1.Caused by: org.apache.kafka.common.KafkaException: org.codehaus.jackson.map.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer* 错误解释,kafka在进行序列化实例对象的时候出错* 查找原因:* org.codehaus.jackson.map.deser.std.StringDeserializer是我们AppParas中导入的类型,可能是导错了,* 查看后发现应该导入:import org.apache.kafka.common.serialization.StringDeserializer* 2. 程序出现INFO:Marking the coordinator os3:9092 (id: 2147483645 rack: null) dead for group cmcc_test2,且程序不再执行下去* 原因:因为kafka-clent程序默认读取到kafka上的信息之后将host:os3返回作为主机节点去获取数据,但是在本机中没有配置相应的host与ip的映射,所有这里就无法直接进行访问os3* 解决办法;在windows中配置相应的ip与hostname的映射(kafka中的broker节点)* 3.json解析出错:error parse false* 原因json格式错误** 4.flume的坑:a0.channels.c1.parseAsFlumeEvent = false 1.7以后默认为true* 如果设置此项为 true,Kafka Sink 则会把数据按照标准的 Flume Event 格式(即Headers域和body域结合的数据结构)发送。Flume Event 中的 Headers 域通常是一些附加字段,可以是时间戳(比如时间戳拦截器指定的时间戳)、文件名(比如 spooldir Source 开启的 fileHeader = true)等信息。但是 1.7.0 版本的 Flume 一旦开启此配置,会导致 Headers 域里面的信息乱码** 5.flume异常崩溃 File has been modified since being read* 原因:出现这个问题的原因是,当我们拷贝一个文件的时候,一些对文件进行了修改,就会出现这个错误* 解决:最好的方法就是,确保大文件完全拷贝后,再让flume来读取,思路是将拷贝中的文件加上一个多余的后缀,flume一开始不会读取文件,当文件拷贝完成后去掉多余的后缀,这个时候flume就会针对新文件进行读取。* 另外针对大文件,flume的解决方案可以设置一个文件完成后缀:*/val sparkConf = new SparkConf()sparkConf.setAppName("中国移动运营实时监控平台")sparkConf.setMaster("local[*]")/***将rdd以kryo的序列化保存,以减少内存的使用*/sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")/*** 对rdd进行压缩,使用内存空间换去处理时间的方式,减少内存的使用*/sparkConf.set("spark.rdd.compress", "true")/****/sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "100")/*** 进行优雅的停止程序*/sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")/*** 每两秒执行一个批次*/val ssc = new StreamingContext(sparkConf, Seconds(2))/*** 获取kafka的数据* LocationStrategies:位置策略,如果kafka的broker节点与Excutor在同一台机器上给一种策略,不再一台机器上给另一种策略* 设定策略之后会以最有的策略进行获取数据* 一般在企业中kafka节点与Excutor不会放到一台机器的,原因是kafka是消息存储的,Executor是用来做消息计算的* 因此计算与存储需要分开,存储对磁盘要求高,计算对内存和cpu的要求更高* 如果Executor节点跟Broker的节点在一起的话就使用PreferBrokers策略,不再一起的话就使用preferConsisent策略* 使用preferConsisent策略的话,将来在kafka中拉去数据以后尽量将数据分散到所有的Executor上*/val stream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](AppParams.topic, AppParams.kafkaParams))stream.foreachRDD(rdd => {/*** {* "bussinessRst": "0000",* "channelCode": "0705",* "chargefee": "10000",* "clientIp": "125.82.117.133",* "endReqTime": "20170412080609613",* "idType": "01",* "interFacRst": "0000",* "logOutTime": "20170412080609613",* "orderId": "384681890175026754",* "prodCnt": "1",* "provinceCode": "280",* "requestId": "20170412080450886738519397327610",* "retMsg": "成功",* "serverIp": "172.16.59.241",* "serverPort": "8088",* "serviceName": "sendRechargeReq",* "shouldfee": "9950",* "startReqTime": "20170412080609503",* "sysId": "15"* }*//*** 业务逻辑:* serviceName:reChargeNotifyReq,则为充值通知的记录* requestId:包含充值的日期(订单开始时间)* bussinessRst:是否成功 0000 为成功,其他为不成功* chargefee:充值的金额* receiveNotifyTime:订单结束时间**//*** 我们可以通过serviceName字段来确定,如果该字段是reChargeNotifyReq则代表该条数据是充值通知部分的数据。* 获取所有的充值通知日志*/val baseData = rdd.map(cr => {print(cr.value())JSON.parseObject(cr.value())}).filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).cache()/*** 获取每天充值成功的订单笔数* 回忆:* wordcount flatMap-》map-》reduceByKey*/val totalSucc = baseData.map(obj=> {//获取日期val reqId = obj.getString("requestId")//获取日期val day = reqId.substring(0, 8)//取出该条充值是否成功的标志val result = obj.getString("bussinessRst")val flag = if(result.equals("0000")) 1 else 0(day, flag)}).reduceByKey(_+_)/*** 获取充值成功的订单金额*/val totalMoney = baseData.map(obj=> {val reqId = obj.getString("requestId")//获取日期val day = reqId.substring(0, 8)//去除该条充值是否成功的标记val result = obj.getString("bussinessRst")val fee = if(result.equals("0000")) obj.getString("chargefee").toDouble else 0(day, fee)}).reduceByKey(_+_)//总订单数val total = baseData.count()/*** 获取充值成功的充值时长*/val totalTime = baseData.map(obj=> {var reqId = obj.getString("requestId")//获取日期val day = reqId.substring(0, 8)//取出该条充值是否成功的标示val result = obj.getString("bussinessRst")//时间格式为:yyyyMMddHHmissSSSval endTime = obj.getString("receiveNotifyTime")val startTime = reqId.substring(0, 17)val format = new SimpleDateFormat("yyyyMMddHHmissSSS")val cost = if(result.equals("0000")) format.parse(endTime).getTime - format.parse(startTime).getTime else 0(day, cost)}).reduceByKey(_+_)/*** 将数据存储到redis中:* (CMCC-20170412,35)*/totalSucc.foreachPartition(itr=> {val jedis = Jpools.getJedisitr.foreach(tp => {// print("CMCC-"+tp._1, tp._2)jedis.incrBy("CMCC-"+tp._1, tp._2)})})})ssc.start()ssc.awaitTermination()} }
两个工具类:
package utilsimport com.typesafe.config.ConfigFactory import org.apache.kafka.common.serialization.StringDeserializerobject AppParams {/**Scala中使用关键字lazy来定义惰性变量,实现延迟加载(懒加载)。惰性变量只能是不可变变量,并且只有在调用惰性变量时,才会去实例化这个变量。load中可以指定相应的配置文件,但是不指定的情况下默认去读取resources下的application.conf文件默认规则:application.conf->application.json->application.properties**/private lazy val config = ConfigFactory.load()val redisHost = config.getString("redis.host")val selectDBIndex = config.getInt("redis.db.index")/*** 返回订阅的主题*/val topic = config.getString("kafka.topic").split(",")/*** kafka集群所在的主机和端口*/val brokers:String = config.getString("kafka.broker.list")/*** 消费者的id*/val groupId = config.getString("kafka.group.id")/*** 将kafka的相关参数进行分装到map中*/val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer"-> classOf[StringDeserializer],"group.id"-> groupId,"auto.offset.reset" -> "earliest","enable.auto.commit" -> "false") }
package utils import org.apache.commons.pool2.impl.GenericObjectPoolConfig import redis.clients.jedis.JedisPool/*** 创建一个redis的线程池*/ object Jpools {private val poolConfig = new GenericObjectPoolConfigpoolConfig.setMaxIdle(5) //最大的空闲连接数为5,连接池中最大的空闲连接数,默认是8poolConfig.setMaxTotal(2000) //最大支持的连接数量,默认也是8//连接池是私有的,不能对外进行公开访问private lazy val jedisPool = new JedisPool(poolConfig, AppParams.redisHost)def getJedis = {val jedis = jedisPool.getResourcejedis.select(AppParams.selectDBIndex)jedis} }
pom文件
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>1.3.3</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.0</version></dependency><!-- 导入kafka的依赖--><!-- <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.10.1.0</version></dependency>--><!-- 指定kafka-client API的版本--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.0</version></dependency><!-- 导入spark streaming 与kafka的依赖包--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.46</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency></dependencies>
问题总结:
1.json格式的转换 (已解决)
2.flume读取数据到kafka后数据乱码增多问题(已解决)
3.flume spooldir 读取文件的同时对文件更改造成的java.lang.IllegalStateException:File has been modified since being read:问题 (待解决)
4.上述spark主程序代码优化问题 (待解决)
转载于:https://www.cnblogs.com/zyc-2019/p/10596260.html
中国移动实时数据分析-基于spark+kafka+flume相关推荐
- 第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战
第53课实战操作Kafka+Flume成功! Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战 flume 安装在集群的worker4上,地址192.168.189. ...
- mysql增量同步kafka_MySQL数据实时增量同步到Kafka - Flume
写在前面的话 需求,将MySQL里的数据实时增量同步到Kafka.接到活儿的时候,第一个想法就是通过读取MySQL的binlog日志,将数据写到Kafka.不过对比了一些工具,例如:Canel,Dat ...
- 基于HDP使用Flume实时采集MySQL中数据传到Kafka
注意:HDP中Kafka broker的端口是6667,不是9092 如有需要请看:基于HDP使用Flume实时采集MySQL中数据传到Kafka+Hive/HDFS 1.将flume-ng-sql- ...
- 基于HDP使用Flume实时采集MySQL中数据传到Kafka+HDFS或Hive
环境版本: HDP-2.5.3 注意:HDP中Kafka broker的端口是6667,不是9092 如果只sink到kafka请看这篇:基于HDP使用Flume采集MySQL中数据传到Kafka 前 ...
- 基于Spark平台的协同过滤实时电影推荐系统
文章目录 摘要 0 引言 1 协同过滤算法 2 实时推荐服务 3 电影推荐系统设计部署 3.1 架构设计 3.2 系统功能设计 3.3 数据库设计 4 系统运行效果 5 结语 参考文献 摘要 摘要:随 ...
- clickhouse 航空数据_趣头条基于Flink+ClickHouse的实时数据分析平台
原标题:趣头条基于Flink+ClickHouse的实时数据分析平台 分享嘉宾:王金海 趣头条 编辑整理:王彦 内容来源:Flink Forward Asia 出品平台:DataFunTalk 导读: ...
- grafana计算不同时间的差值_大数据时代!如何基于Spark Streaming构建实时计算平台...
随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台 ...
- 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)
系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...
- 利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)
系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...
最新文章
- JS、CSS中的相对路径
- React开发(170):ant design datapicker限制选择时间
- sql 查讯本日 本周 本月
- CCKS2018参会总结项目
- [BZOJ4987]Tree
- 途观l怎么使用_值得考虑的SUV车型,昂科威、探岳、途观L,你会如何选?
- 分布式消息中间件之kafka设计思想及基本介绍(一)
- 和jwt应用场景_一文了解web无状态会话token技术JWT
- Atitit 文件系统概论 艾提拉著 目录 1. NTFS系统	1 1.1. NTFS文件系统概述	1 1.2. 1.1 文件系统简介 1.2 NTFS文件系统	1 1.3. 第2章
- GIS软件开发入门需要学习哪些内容?
- Luarocks的使用
- 魔兽世界稳定服务器,魔兽世界美服服务器趋于稳定 排队新技术将实装
- 简述神经网络的优点和缺点,人工神经网络优缺点
- linux user32.lib,Linux之#pragma的用法
- Codeforces 480D Parcels 动态规划
- Typora自定义主题分享 (Mac风、图片立体感...)
- Linux CentOS7.0 使用root登录桌面
- 使用ivx中表格组件的经验总结
- 我常去的ios开发论坛/iphone开发论坛
- MAC M1安装telnet
热门文章
- 网站域名到期无法续费后还能买回来吗?
- 两台win 7系统电脑 一台双网卡 共享上网
- 无线群控服务器转让,群控客户端绑定服务器文件
- PHP开发群控,玩客云改群控的试玩体验,群控插件开发原来如此简单
- Android之自定义Dialog(Layout引入布局)
- python爬虫之Scrapy(一)
- java计算机毕业设计临沂旅游咨询系统源码+系统+数据库+lw文档+mybatis+运行部署
- 关于量子计算机的一些整理 (精心整理原创) (1)
- absolute和fixed定位的区别
- 【原创】【歌曲评论】【之二】【白色风车】