SparkStreaming 系列(二)kafka与Streaming集成direct流实战---多流集群高并发场景代码演示
写在前面: 我是
「nicedays」
,一枚喜爱做特效,听音乐,分享技术的大数据开发猿
。这名字是来自world order乐队的一首HAVE A NICE DAY
。如今,走到现在很多坎坷和不顺,如今终于明白nice day是需要自己赋予的。
白驹过隙,时光荏苒,珍惜当下~~
写博客一方面是对自己学习的一点点总结及记录
,另一方面则是希望能够帮助更多对大数据感兴趣的朋友。如果你也对大数据与机器学习
感兴趣,可以关注我的动态https://blog.csdn.net/qq_35050438
,让我们一起挖掘数据与人工智能的价值~
文章目录
- SparkStreaming集成kafka:
- Direct方式:
- Direct优点:
- KafkaUtils.createDirectStream()三个参数:
- 集成kafka实战代码(scala语言)
- Producer发送问题:
- 解决办法一:
- 解决办法二:
- 解决办法三:
SparkStreaming集成kafka:
SparkStreaming提供了两种方式对接kafka数据源:Receiver,Direct
streaming-kafka0.10之后已经不在支持receiver
Direct方式:
Spark1.3之后引入
周期性的查询kafka
,获取每个partition的最新的offset,从而定义每个batch的offset范围。当处理数据的 job 启动时,就会使用 Kafka 的简单 consumer api来获取 Kafka 指定 offset 范围的数据。采用kafka的consumer api方式读取数据
:当batch任务触发时,由executor读取数据,并参与到其他executor的数据计算过程中去,driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由executor读取kafka数据并计算。
Direct优点:
简化并行读取
:读取多个partition,不需要创建多个DStream,然后对他们进行union操作,spark会自动创建和kafka partition一样多的RDD partition,且并行的从Kafka中读取。
高性能
:由于kafka本身就有高可靠的机制,会对数据复制,所以只要从kafka中通过kafka的副本进行恢复即可。
一次且仅一次的事务机制
:基于 direct 的方式,使用 kafka 的简单 api,Spark Streaming 自己就负责追踪消费的 offset,并保存在 checkpoint 中。Spark 自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
KafkaUtils.createDirectStream()三个参数:
ssc : StreamingContext
:流处理上下文类LocationStrategies
: Consumer 调度分区的位置策略LocationStrategies.PreferConsistent
:每个 Spark Executor 作 为 一 个Consumer,Executor 与 Kafka Partition 一对一 。大多数情况下使用,主要是为了均匀分布。
LocationStrategies.PreferBrokers
:每个 Spark Executor 与 Kafka Broker 在相同 HOST 上。也就是说 Spark Executor 优先分配到 Kafka Broker 上。
LocationStrategies.PreferFixed
:Spark Executor 与 Kafka Partition 使用固定的映射(fixed mapping)。如果负载不均衡,可以通过这种方式来手动指定分配方式,当其他没有在 fixed mapping 中指定的,均采用
PreferConsistent 方式分配。
ConsumerStrategies
: 指消费策略。ConsumerStrategies.Subscribe
:允许订阅固定的主题集合ConsumerStrategies.Assign
:指定固定的分区集合。
集成kafka实战代码(scala语言)
sparkstreaming从kafka中拉取数据进行流处理后再放入Kafka分区中
应用程序是部署在YARN群集上的长时间运行的Spark Streaming作业。该job从Kafka中接收数据,校验数据,将其转换成Avro二进制格式,并将其发送到另一个Kafka的topic
Producer发送问题:
producer是在driver上创建,但消息被发送到executor。这个producer与Kafka的brokers保持套接字连接,以至于它不能被序列化和在网络中发送
解决办法一:
把创建producer的方法写在每个分区里,也就是在executor上都各自发送消息的时候创建和关闭producer,就逃避了序列化,
**缺陷:**每个消息都会创建和关闭producer。与集群建立连接是需要时间的,由于Kafka procuder需要在所有的分区中查找leaders,所以它比打开一个普通的套接字连接更耗时
解决办法二:
使用foreachpartition进行一个个的分区处理,相比之前一个个消息都创建一个,这个方法创建的producer对象更少,因为同一个分区在同一个executor上,同样避免了序列化,而且分区内消息相互之间共享一个producer,节省创建关闭的连接大开销。
解决办法三:
优化:广播和懒加载
方法二在大数据量下,仍然要创建很多producer,
- 我们对此使用广播的方式,让所有的executor都只用一个producer实例变量。
- 同时我们要避免序列化producer,所以我们广播的是创建producer的方法,具体的对象可以在executor执行端创建,这样就避免了序列化复杂的producer对象。
- 而且我们同时再使用懒加载的方式,不用再dirver端就广播,让它延迟加载在运行到发送数据代码的再开始创建。
- 如果我们在driver端有多个流需要我们进行处理和发送,我们此时只需要一个创建producer的方法实例即可,我们需要这里开一个单例工厂去完成更复杂的情景。
- 在executor的JVM关闭之前,我们必须关闭Kafka procuder。缺少这一步,Kafka procuder内部缓冲的所有消息都将丢失
进化到第三步我们开始写代码:
KafkaSinks:
package com.wyw.test1
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}// 多线程产生连接
class KafkaSinks[K,V](fc: () => KafkaProducer[K,V]) extends Serializable {//延迟到exectuor端创建producer对象lazy val producer: KafkaProducer[K, V] = fc()def send(topic: String, key: K,value: V) = {producer.send(new ProducerRecord[K,V](topic, key, value))}def send(topic: String,value: V) = {producer.send(new ProducerRecord[K,V](topic, value))}
}object KafkaSinks {import scala.collection.JavaConversions._def apply[K,V](conf: Map[String,Object]): KafkaSinks[K,V] = {// 创建一个new类需要得匿名函数val func = () => {val prod = new KafkaProducer[K, V](conf)// executorJVM关闭时,producer也得关闭sys.addShutdownHook{prod.close()}prod}// new 创建producer对象方法的类new KafkaSinks[K,V](func)}def apply[K,V](conf: Properties): KafkaSinks[K,V] = apply(conf.toMap)}
ProducerMethodSingleDAO
:
driver端单例创建producer的方法,防止广播多次
package com.wyw.test1import java.util.Properties
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.codehaus.jackson.map.ser.std.StringSerializerobject MySingleBaseDAO {@volatile private var instance: Broadcast[KafkaSinks[String, String]] = nulldef getInstance() = {// 多线程有的会判空,有的不会判空if (instance == null) {val sc = SparkSession.builder().appName("kafka").master("local[*]").getOrCreate().sparkContextsynchronized {if (instance == null) {val p = new Properties()p.setProperty("bootstrap.servers", "192.168.56.101:9092")p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)instance = sc.broadcast(KafkaSinks[String,String](p))}instance}}instance}
}
KafkaStreamKafkaConsumer:
处理接收和发送的主代码
package com.wyw.test1import com.google.gson.Gson
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import shaded.parquet.org.slf4j.LoggerFactoryobject FlumeKafkaStreamConsumer {private val LOG = LoggerFactory.getLogger("Kafka2KafkaStreaming")def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("mykafka")val sc: SparkContext = new SparkContext(conf)// 流处理上下文类val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))// 因为有状态DStream,所以必须要有记录
// ssc.checkpoint("e:\\大数据\\mykafka-log")// 创建连接kafka服务器参数val kafkaParam = Map(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.56.101:9092",// 选定一组消费者去读数据,ConsumerConfig.GROUP_ID_CONFIG -> "flumeKafkaStream5",ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG -> "20000",// 每组消费者读完会把指针读到末尾无法返回,这个配置将指针放到开头ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])// 创建Direct流, val streams: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("flumeKafkaStream5"), kafkaParam))val kafkaProducer = MySingleBaseDAO.getInstance()
// 非单例代码
// val kafkaProducer: Broadcast[KafkaSinks[String, String]] = {// val kafkaProducerConfig = {// val p = new Properties()
// p.setProperty("bootstrap.servers", "192.168.56.101:9092")
// p.setProperty("key.serializer", classOf[StringSerializer].getName)
// p.setProperty("value.serializer", classOf[StringSerializer].getName)
// p
// }
// if (LOG.isInfoEnabled)
// LOG.info("kafka producer init done!")
// ssc.sparkContext.broadcast(KafkaSinks[String, String](kafkaProducerConfig))
// }streams.foreachRDD(rdd => {val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges// 如果rdd有数据if (!rdd.isEmpty()) {// 代码在将会在每个executor里执行处理数据rdd.map(_.value()).filter(_.split(",").size > 1).flatMap(line => {val ids = line.split(",")ids(1).split(" ").map(word => (ids(0), word))}).foreachPartition(partition => {// 使用广播变量发送到Kafkapartition.foreach(record => {kafkaProducer.value.send("flumeKafkaStream6", new Gson().toJson(record))})})}})ssc.start()ssc.awaitTermination()}}
SparkStreaming 系列(二)kafka与Streaming集成direct流实战---多流集群高并发场景代码演示相关推荐
- spark-streaming系列------- 3. Kafka DirectDStream方式数据的接收
spark-streaming系列------- 3. Kafka DirectDStream方式数据的接收 KafkaRDD分区个数的确定和每个分区数据接收的计算 在KafkUtils.create ...
- Kafka之四:Kafka与Streaming集成
Kafka之四:Kafka与Streaming集成 文章目录 Kafka之四:Kafka与Streaming集成 1. 修改IEDA的maven配置 2. 程序一 3. 程序二:统计次数 4. 提交任 ...
- Spring Boot集成Redis缓存之模拟高并发场景处理
前言 同样我们以上一篇文章为例子,搭建好环境之后,我欧美可以模拟高并发场景下,我们的缓存效率怎么样,到底能不能解决我们实际项目中的缓存问题.也就是如何解决缓存穿透? Spring Boot集成Redi ...
- Hadoop集群高可用及zookeeper+kafka组件搭建
目录 一.Hadoop集群高可用 1.Zookeeper概述 1)Zookeeper基本概述 3)Zab协议 3)observer 4)zookeeper集群图例 2.zookeeper集群搭建 3. ...
- java高并发(二十一)高并发场景下缓存常见问题
缓存一致性 当数据实时性要求很高时,需要保证缓存中的数据与数据库中的数据一致,缓存节点与副本中的数据一致,不能出现差异现象,这就比较依赖缓存的过期和更新策略了.一般会在数据发生更改的时候,主动跟新缓存 ...
- 消息中间件→产生原因、JMS与AMQP、主流消息中间件、基本概念、ActiveMQ、集群、实际场景问题解决方案、集成rabbitMQ与kafka
产生原因 消息中间件 JMS规范与AMQP协议 MQ中间件优劣 中间件基本概念 队列 主题 JMS编码接口关系 ActiveMQ官网 https://activemq.apache.org/ 队列模式 ...
- 09_Flink入门案例、word-count程序(java和scala版本)、添加依赖、Flink Streaming和Batch的区别 、在集群上执行程序等
1.9.Flink入门案例-wordCount 1.9.1.开发工具 1.9.2.编写java版本word-count程序 1.9.2.1.添加Flink Maven依赖 1.9.2.2.编写word ...
- Kafka不停机,如何无感知迁移ZooKeeper集群?
Kafka 在 Yelp 的应用十分广泛,Yelp 每天通过各种集群发送数十亿条消息,在这背后,Kafka 使用 Zookeeper 完成各种分布式协调任务. 因为Yelp 非常依赖 Kafka,那么 ...
- k8s和harbor的集成_爱威尔-基于kubernetes集群的项目持续集成(gitlab+harbor+Jenkins)安装...
这个算是基于kubernetes集群的项目持续集成的前导篇,先把这用环境搭建好我们后面就可以专注做基于k8s的docker化项目持续集成了. gitlab安装 https://about.gitlab ...
最新文章
- easyui datagrid 表头 sort 排序
- 【❤️算法系列之二叉树的实现(包含前序、中序、后序遍历以及节点的查找和删除)❤️】
- ad软件侵权律师函_Aspen Plus 9 软件安装教程
- python函数可变参数_python中函数的可变参数
- 霍尼韦尔dcs系统组态手册_DCS和PLC有“血缘关系”吗?
- 广度优先搜索算法(Breath-first Search)是如何搜索一张图的?
- 【2031】求一元三次方程的解
- solidworks迈迪设计宝_做非标机械设计必备的辅助工具,如米思米、怡合达、英科宇等...
- AIDL解析(一):AIDL原理解析
- 浅析泛在电力物联网及国网公司“三型两网”战略
- poi excel 插入批注
- iOS内购实现及测试Check List
- 阿里小程序亮相2019上海云峰会:大生态促成许多“小而美”
- java小组口号,小组口号大全
- ctfshow XXE web373-web378 wp
- 海思Hi3516dv300屏幕调试MIPI TX接口LCD屏幕(京东方JD9366)
- 讯飞星火 VS 文心一言:谁是中文大语言模型的TOP1?
- win10右键 发送到 选项消失解决办法
- ms08_067,ms10_046漏洞复现与利用
- 在html中控制自动换行 1