业务是订单成交信息,要求计算出成交总金额,每一类商品的金额,区域成交的金额这三个指标。
数据格式:C 202.102.152.3 家具 婴儿床 2000

SparkStreaming读取Kafka中的数据,使用直连方式,然后实现数据的累加,数据保存到Redis中。

OrderCount.scala

package XXXimport io.netty.handler.codec.string.StringDecoder
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}/*** Create by ...**/
object OrderCount {def main(args: Array[String]): Unit = {//指定组名val group = "g1"//创建SparkConfval conf = new SparkConf().setAppName("KafkaDirectWordCountV2").setMaster("local[4]")//创建SparkStreaming,并设置间隔时间val ssc = new StreamingContext(conf,Seconds(5))//获取广播数据的引用val broadcastRef = IpUtils.broadcastIpRules(ssc,args(0))//指定消费者的topic名字val topic = "orders"//指定Kafka的broker地址(SparkStreaming的Task直接连到Kafka的分区上,用更加底层的API消费,效率更高)val brokerList = "L3:9092,L4:9092,L5:9092"//指定zk的地址,后期更新消费的偏移量时使用(也可以使用Redis,mysql来记录偏移量)val zkQuroum = "L1:2181,L2:2181,L3:2181"//创建stream时使用的topic名字集合,SparkStreaming可同时消费多个topicval topics: Set[String] = Set(topic)//创建一个ZKGroupTopicDirs对象,其实是指定往zk中写入数据的目录,用于保存偏移量val topicDirs = new ZKGroupTopicDirs(group,topic)//获取zookeeper中的路径"g002/offsets/wc"val zkTopicPath = s"${topicDirs.consumerOffsetDir}"//准备Kafka的参数val kafkaParams = Map(//"key.deserializer" -> classOf[StringDeserializer],//"value.deserializer" -> classOf[StringDeserializer],//"deserializer.encoding" -> "GB2312",   //配置读取Kafka中数据的编码"metadata.broker.list" -> brokerList,"group.id" -> group,//从头开始读取数据"auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString)//zookeeper的host和IP,创建一个client,用于更新偏移量//是zookeeper的客户端,可以从zk中读取偏移量数据,并更新偏移量val zkClient = new ZkClient(zkQuroum)//查询改路径下是否有节点(默认有子节点是为我们保存不同partition时生成的)// /g002/offsets/wc/0/10001// /g002/offsets/wc/1/30001// /g002/offsets/wc/2/10001// zkTopicPath -> /g002/offsets/wc/val children = zkClient.countChildren(zkTopicPath)var kafkaStream:InputDStream[(String,String)] = null//如果zookeeper中保存有offset,我们会利用这个offset作为kafkaStream的起始位置var fromOffsets:Map[TopicAndPartition,Long] = Map()//如果保存过offsetif (children > 0) {for (i <- 0 until children) {// /g002/offsets/wc/0/10001// /g002/offsets/wc/0val partitionOffset = zkClient.readData[String](s"$zkTopicPath/$i")// wc/0val tp = TopicAndPartition(topic, i)//将不同 partition 对应的 offset 增加到 fromOffsets 中// wc/0 -> 10001fromOffsets += (tp -> partitionOffset.toLong)}//Key: kafka的key   values: "hello tom hello jerry"//这个会将 kafka 的消息进行 transform,最终 kafka 的数据都会变成 (kafka的key, message) 这样的 tupleval messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())//通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据)//[String, String, StringDecoder, StringDecoder,     (String, String)]//  key    value    key的解码方式   value的解码方式kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)} else {//如果未保存,根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offsetkafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)}//偏移量的范围var offsetRanges = Array[OffsetRange]()//直连方式只有在KafkaDStream的RDD中才能获取偏移量,那么就不能到调用DStream的Transformation//所以只能子在kafkaStream调用foreachRDD,获取RDD的偏移量,然后就是对RDD进行操作了//依次迭代KafkaDStream中的KafkaRDD//如果使用直连方式累加数据,那么就要在外部的数据库中进行累加(用KeyValue的内存数据库(Nosql),Redis)//kafkaStream.foreachRDD里面的业务逻辑是在Driver端执行的kafkaStream.foreachRDD { kafkaRDD =>//判断当前的kafkaStream中的RDD是否有数据if (!kafkaRDD.isEmpty()){//只有KafkaRDD可以强转成HasOffsetRanges,并获取到偏移量offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRangesval lines: RDD[String] = kafkaRDD.map(_._2)//整理数据val fields: RDD[Array[String]] = lines.map(_.split(" "))//1.计算成交总金额CalculateUtil.calculateIncome(fields)//2.计算商品分类金额CalculateUtil.calculateItem(fields)//3.计算区域成交金额CalculateUtil.calculateZone(fields,broadcastRef)//偏移量更新是在Driver端for (o <- offsetRanges) {//  /g002/offsets/wc/0val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"//将该 partition 的 offset 保存到 zookeeper//  /g002/offsets/wc/0/20000ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)}}}ssc.start()ssc.awaitTermination()}}

CalculateUtil.scala

用来计算具体业务的工具类。

package XXXimport cn.edu360.sparkIpTest.TestIp
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD/*** Create by 。。。**/
object CalculateUtil {def calculateIncome(fields:RDD[Array[String]]) : Unit = {//将计算好的数据写到redis中val priceRDD: RDD[Double] = fields.map(_(4).toDouble)//reduce是一个Action,会把结果返回到Driver端//将当前批次的总金额返回了val sum: Double = priceRDD.reduce(_+_)//获取一个jedis连接val conn = jedisConnectionPool.getConnection//将历史值和当前的值进行累加//conn.set(Constant.TOTAL_INCOME,sum.toString)conn.incrByFloat(Constant.TOTAL_INCOME,sum)//释放连接conn.close()}/*** 计算分类的成交金额* @param fields 整理后的数据*/def calculateItem(fields:RDD[Array[String]]) : Unit = {//对fields的map方法是在Driver端执行的val itemAndPrice: RDD[(String, Double)] = fields.map(arr => {//取出分类val item = arr(2)//取出金额val price = arr(4).toDouble(item, price)})//按照商品分类进行聚合val reduced: RDD[(String, Double)] = itemAndPrice.reduceByKey(_+_)//将当前批次的数据累加到Redis中//foreachPartition是一个Action//现在这种方式,jedis是在Driver端创建的//在Driver端拿jedis连接不好//val conn = jedisConnectionPool.getConnection()reduced.foreachPartition(part => {//获取一个jedis连接//这个连接是在executor中获取的//jedisConnectionPool在一个executor进程中只有一个实例(因为jedisConnectionPool是一个object,单例)val conn = jedisConnectionPool.getConnectionpart.foreach(t => {//一个连接更新多条数据conn.incrByFloat(t._1,t._2)})//将当前分区中的数据更新完再关闭连接conn.close()})}/*** 根据IP计算归属地* @param fields   整理后的数据* @param broadcastRef   广播数据的引用*/def calculateZone(fields:RDD[Array[String]], broadcastRef:Broadcast[Array[(Long, Long, String)]]) : Unit = {val provinceAndPrice: RDD[(String, Double)] = fields.map(arr => {val ip = arr(1)//获取订单金额val price = arr(4).toDouble//将数据中的IP转换成二进制val ipNum = TestIp.ip2Long(ip)//在executor中获取到广播的全部规则val allRules: Array[(Long, Long, String)] = broadcastRef.value//二分法查找var province = "未知"val index: Int = TestIp.binarySearch(allRules, ipNum)if (index != -1) {province = allRules(index)._3}//省份,订单金额(province, price)})//聚合val reduced: RDD[(String, Double)] = provinceAndPrice.reduceByKey(_+_)//将结果更新到redis中reduced.foreachPartition(part => {//获取jedis连接val conn = jedisConnectionPool.getConnectionpart.foreach(t => {conn.incrByFloat(t._1,t._2)})conn.close()})}}

IpUtils.scala

用来处理广播的IP规则的工具类。

package XXXimport org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext/*** Create by 。。。**/
object IpUtils {def broadcastIpRules(ssc: StreamingContext, ipRulesPath: String) : Broadcast[Array[(Long, Long, String)]] = {//先获取sparkContextval sc = ssc.sparkContext//将ip.txt读取到HDFS中val rulesLines: RDD[String] = sc.textFile(ipRulesPath)//整理ip规则数据//这里是在Executor中执行的,每个Executor只计算部分的IP规则数据val ipRulesRDD: RDD[(Long, Long, String)] = rulesLines.map(line => {val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)})//需要将每个Executor端执行完的数据收集到Driver端val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect()//再将Driver端的完整的数据广播到Executor端//生成广播数据的引用val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rulesInDriver)broadcastRef}}

jedisConnectionPool.scala

用来获取jedis连接。

package XXXimport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}/*** Create by 。。。**/
object jedisConnectionPool {val config = new JedisPoolConfig()//最大连接数config.setMaxTotal(20)//最大空闲连接数config.setMaxIdle(10)//当调用borrow Object方法时,是否进行有效性检查 -->config.setTestOnBorrow(true)//10000代表超时时间(10秒)val pool = new JedisPool(config,"192.168.67.134",6379,10000,"")def getConnection:Jedis = {pool.getResource}def main(args: Array[String]): Unit = {val conn = jedisConnectionPool.getConnection
//    conn.set("income","1000")
//
//    val r1 = conn.get("tianmao")
//    println(r1)
//
//    conn.incrBy("tianmao",-20)
//
//    val r2 = conn.get("tianmao")
//    println(r2)
//
//    conn.close()val r = conn.keys("*")import scala.collection.JavaConversions._for (p <- r){println(p + ":" + conn.get(p))}}}

TestIp.scala

对IP进行处理–将IP转换为十进制,通过二分法查找获取对应的省份

package XXXimport java.sql.{Connection, DriverManager, PreparedStatement}import scala.io.{BufferedSource, Source}object TestIp {//将IP转化为十进制def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum =  fragments(i).toLong | ipNum << 8L}ipNum}//定义读取ip.txt规则,只要有用的数据def readRules(path:String):Array[(Long,Long,String)] = {//读取ip.txtval bf: BufferedSource = Source.fromFile(path)//对ip.txt进行整理val lines: Iterator[String] = bf.getLines()//对ip进行整理,并放入内存val rules: Array[(Long, Long, String)] = lines.map(line => {val fileds = line.split("[|]")val startNum = fileds(2).toLongval endNum = fileds(3).toLongval province = fileds(6)(startNum, endNum, province)}).toArrayrules}//二分法查找def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}def data2MySQL(it: Iterator[(String, Int)]): Unit = {//一个迭代器代表一个分区,分区中有多条数据//先获得一个JDBC连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "")//将数据通过Connection写入到数据库val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")//将分区中的数据一条一条写入到MySQL中it.foreach(tp => {pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()})//将分区中的数据全部写完之后,在关闭连接if(pstm != null) {pstm.close()}if (conn != null) {conn.close()}}def main(args: Array[String]): Unit = {//数据是在内存中val rules: Array[(Long, Long, String)] = readRules("E:/Spark视频/小牛学堂-大数据24期-06-Spark安装部署到高级-10天/spark-04-Spark案例讲解/课件与代码/ip/ip.txt")//将ip地址转换成十进制val ipNum = ip2Long("1.24.6.56")//查找val index = binarySearch(rules,ipNum)//根据脚标到rules中查找对应的数据val tp = rules(index)val province = tp._3println(province)}}

Constant.scala

计算总的成交金额,定义保存redis中的key

package XXX/*** Create by 。。。**/
object Constant {val TOTAL_INCOME = "TOTAL_INCOME"}

SparkStreaming整合Kafka(0.8.2.1)计算不同业务指标并实现累加(结合Redis)相关推荐

  1. 2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用

    目录 整合Kafka 0-10-开发使用 原理 1.Direct方式 2.简单的并行度1 : 1 ​​​​​​​API 注意 ​​​​​​​代码实现-自动提交偏移量到默认主题 ​​​​​​​代码实现- ...

  2. SparkStreaming整合Kafka(Offset保存在zookeeper上,Spark2.X + kafka0.10.X)

    先来一段到处都有的原理(出处到处都有,就不注明了) Streaming和Kafka整合有两种方式--Receiver和Direct,简单理解为:Receiver方式是通过zookeeper来连接kaf ...

  3. 超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践

    来源 | Alice菌 责编 | Carol 封图 |  CSDN 下载于视觉中国 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲太多了,今天的内容主要是为大家带来的是 Spa ...

  4. 超详细!一文详解 SparkStreaming 如何整合 Kafka !附代码可实践

    来源 | Alice菌 责编 | Carol 封图 |  CSDN 下载于视觉中国 出品 | CSDN(ID:CSDNnews) 相信很多小伙伴已经接触过 SparkStreaming 了,理论就不讲 ...

  5. Kafuka面试(整合Kafka两种模式区别)

    整合Kafka两种模式说明 ★面试题:Receiver & Direct 开发中我们经常会利用SparkStreaming实时地读取kafka中的数据然后进行处理,在spark1.3版本后,k ...

  6. Flume+Kafka+SparkStreaming整合

    目录 1.Flume介绍.2 1.1 Flume数据源以及输出方式.2 1.2 Flume的核心概念.2 1.3 Flume结构.2 1.4 Flume安装测试.3 1.5 启动flume4 2.Ka ...

  7. 2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明

    目录 Kafka快速回顾 消息队列: 发布/订阅模式: Kafka 重要概念: 常用命令 整合说明 两种方式 两个版本API 在实际项目中,无论使用Storm还是SparkStreaming与Flin ...

  8. SpringbBoot2.0整合kafka

    在配置项目配置整合kafka时遇到了坑,要进行整合的小伙伴们要注意版本一致的问题哦,运行不成功一定是版本的问题. 1 kafka安装 1.1 Windows 查看安装https://blog.csdn ...

  9. sparkStreaming连接kafka整合hbase和redis

    sparkStreaming消费kafka数据,并将数据保存到redis和hbase当中去,实现实时 import org.apache.hadoop.hbase.client.{Admin, Con ...

最新文章

  1. php-mysql管理利器 adminer
  2. 函数组:SPO1/2/3/4/5/6/8
  3. boost::serialization相关的测试程序
  4. [Flink]Flink1.3 Stream指南六 事件时间与处理时间
  5. 使用 FTP 迁移 SQL Server 数据_迁移数据_快速入门(SQL Server)_云数据库 RDS 版-阿里云...
  6. VBA GetOpenFilename 方法
  7. maven自带clean_maven之clean、install命令
  8. LeetCode刷题(35)--Permutation Sequence
  9. ruby+selenium-webdriver一步一步完成自动化测试(6)—-生成测试报告
  10. 全球及中国电子材料市场需求分析与十四五投资潜力预测报告2021年版
  11. MCS-51单片机的硬件结构
  12. Ubuntu虚拟机中安装分区(可用)
  13. layui开关按钮及代码实现
  14. Task01 文件处理与邮件自动化
  15. eSIM卡业务开通地区
  16. flash as3.0 图片切换
  17. 裁员潮此起彼伏,转行数据分析师仍旧可期
  18. 弹性法计算方法的mck法_SAM4E单片机之旅——9、UART与MCK之MAINCK
  19. WebGIS教学——瓦片行列号换算
  20. 金蝶EAS BOS上如何打补丁

热门文章

  1. LPL2019职业联赛春季+夏季赛数据分析
  2. 《绘王K28开箱测评》数位板
  3. 淘宝补单计划,如何补基础销量,一周补单量
  4. 开源不仅是Red Hat的软件
  5. P3166 [CQOI2014]数三角形
  6. 惠普HP CM1312nfi 彩色激光打印机如何添加网络打印机
  7. 远程网络教学系统UML用例图
  8. 总结大佬经验,如何学习STM32?(入门、进阶)
  9. 各种居中、对齐、页面布局问题,整理思路╮(╯▽╰)╭
  10. Dell 服务器 用板载网口访问iDrac 并设置风扇静音