写在前面: 我是「nicedays」,一枚喜爱做特效,听音乐,分享技术大数据开发猿。这名字是来自world order乐队的一首HAVE A NICE DAY。如今,走到现在很多坎坷和不顺,如今终于明白nice day是需要自己赋予的。
白驹过隙,时光荏苒,珍惜当下~~
写博客一方面是对自己学习的一点点总结及记录,另一方面则是希望能够帮助更多对大数据感兴趣的朋友。如果你也对 大数据与机器学习感兴趣,可以关注我的动态 https://blog.csdn.net/qq_35050438,让我们一起挖掘数据与人工智能的价值~

文章目录

  • SparkStreaming集成kafka:
    • Direct方式:
    • Direct优点:
    • KafkaUtils.createDirectStream()三个参数:
    • 集成kafka实战代码(scala语言)
      • Producer发送问题:
        • 解决办法一:
        • 解决办法二:
        • 解决办法三:

SparkStreaming集成kafka:

SparkStreaming提供了两种方式对接kafka数据源:Receiver,Direct

streaming-kafka0.10之后已经不在支持receiver

Direct方式:

Spark1.3之后引入

  • 周期性的查询kafka,获取每个partition的最新的offset,从而定义每个batch的offset范围。当处理数据的 job 启动时,就会使用 Kafka 的简单 consumer api来获取 Kafka 指定 offset 范围的数据。

  • 采用kafka的consumer api方式读取数据:当batch任务触发时,由executor读取数据,并参与到其他executor的数据计算过程中去,driver来决定读取多少offsets,并将offsets交由checkpoints来维护。将触发下次batch任务,再由executor读取kafka数据并计算。

Direct优点:

  1. 简化并行读取

    读取多个partition,不需要创建多个DStream,然后对他们进行union操作,spark会自动创建和kafka partition一样多的RDD partition,且并行的从Kafka中读取。

  2. 高性能

    由于kafka本身就有高可靠的机制,会对数据复制,所以只要从kafka中通过kafka的副本进行恢复即可。

  3. 一次且仅一次的事务机制

    基于 direct 的方式,使用 kafka 的简单 api,Spark Streaming 自己就负责追踪消费的 offset,并保存在 checkpoint 中。Spark 自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

KafkaUtils.createDirectStream()三个参数:

  1. ssc : StreamingContext:流处理上下文类

  2. LocationStrategies: Consumer 调度分区的位置策略

    • LocationStrategies.PreferConsistent

      每个 Spark Executor 作 为 一 个ConsumerExecutor 与 Kafka Partition 一对一大多数情况下使用,主要是为了均匀分布。

    • LocationStrategies.PreferBrokers

      每个 Spark Executor 与 Kafka Broker 在相同 HOST 上。也就是说 Spark Executor 优先分配到 Kafka Broker 上。

    • LocationStrategies.PreferFixed

      Spark Executor 与 Kafka Partition 使用固定的映射(fixed mapping)。如果负载不均衡,可以通过这种方式来手动指定分配方式,当其他没有在 fixed mapping 中指定的,均采用
      PreferConsistent 方式分配。

  3. ConsumerStrategies: 指消费策略。

    • ConsumerStrategies.Subscribe:允许订阅固定的主题集合
    • ConsumerStrategies.Assign:指定固定的分区集合。

集成kafka实战代码(scala语言)

sparkstreaming从kafka中拉取数据进行流处理后再放入Kafka分区中

应用程序是部署在YARN群集上的长时间运行的Spark Streaming作业。该job从Kafka中接收数据,校验数据,将其转换成Avro二进制格式,并将其发送到另一个Kafka的topic

Producer发送问题:

producer是在driver上创建,但消息被发送到executor。这个producer与Kafka的brokers保持套接字连接,以至于它不能被序列化和在网络中发送

解决办法一:

把创建producer的方法写在每个分区里,也就是在executor上都各自发送消息的时候创建和关闭producer,就逃避了序列化,

**缺陷:**每个消息都会创建和关闭producer。与集群建立连接是需要时间的,由于Kafka procuder需要在所有的分区中查找leaders,所以它比打开一个普通的套接字连接更耗时

解决办法二:

使用foreachpartition进行一个个的分区处理,相比之前一个个消息都创建一个,这个方法创建的producer对象更少,因为同一个分区在同一个executor上,同样避免了序列化,而且分区内消息相互之间共享一个producer,节省创建关闭的连接大开销。

解决办法三:

优化:广播和懒加载

方法二在大数据量下,仍然要创建很多producer,

  1. 我们对此使用广播的方式,让所有的executor都只用一个producer实例变量。
  2. 同时我们要避免序列化producer,所以我们广播的是创建producer的方法,具体的对象可以在executor执行端创建,这样就避免了序列化复杂的producer对象。
  3. 而且我们同时再使用懒加载的方式,不用再dirver端就广播,让它延迟加载在运行到发送数据代码的再开始创建。
  4. 如果我们在driver端有多个流需要我们进行处理和发送,我们此时只需要一个创建producer的方法实例即可,我们需要这里开一个单例工厂去完成更复杂的情景。
  5. 在executor的JVM关闭之前,我们必须关闭Kafka procuder。缺少这一步,Kafka procuder内部缓冲的所有消息都将丢失

进化到第三步我们开始写代码:

KafkaSinks:

package com.wyw.test1
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}// 多线程产生连接
class KafkaSinks[K,V](fc: () => KafkaProducer[K,V]) extends Serializable {//延迟到exectuor端创建producer对象lazy val producer: KafkaProducer[K, V] = fc()def send(topic: String, key: K,value: V) = {producer.send(new ProducerRecord[K,V](topic, key, value))}def send(topic: String,value: V) = {producer.send(new ProducerRecord[K,V](topic, value))}
}object KafkaSinks {import scala.collection.JavaConversions._def apply[K,V](conf: Map[String,Object]): KafkaSinks[K,V] = {// 创建一个new类需要得匿名函数val func = () => {val prod = new KafkaProducer[K, V](conf)// executorJVM关闭时,producer也得关闭sys.addShutdownHook{prod.close()}prod}// new 创建producer对象方法的类new KafkaSinks[K,V](func)}def apply[K,V](conf: Properties): KafkaSinks[K,V] = apply(conf.toMap)}

ProducerMethodSingleDAO

driver端单例创建producer的方法,防止广播多次

package com.wyw.test1import java.util.Properties
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.codehaus.jackson.map.ser.std.StringSerializerobject MySingleBaseDAO {@volatile private var  instance: Broadcast[KafkaSinks[String, String]] = nulldef getInstance() = {// 多线程有的会判空,有的不会判空if (instance == null) {val sc = SparkSession.builder().appName("kafka").master("local[*]").getOrCreate().sparkContextsynchronized {if (instance == null) {val p = new Properties()p.setProperty("bootstrap.servers", "192.168.56.101:9092")p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)instance = sc.broadcast(KafkaSinks[String,String](p))}instance}}instance}
}

KafkaStreamKafkaConsumer:

处理接收和发送的主代码

package com.wyw.test1import com.google.gson.Gson
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import shaded.parquet.org.slf4j.LoggerFactoryobject FlumeKafkaStreamConsumer {private val LOG = LoggerFactory.getLogger("Kafka2KafkaStreaming")def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("mykafka")val sc: SparkContext = new SparkContext(conf)// 流处理上下文类val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))// 因为有状态DStream,所以必须要有记录
//    ssc.checkpoint("e:\\大数据\\mykafka-log")// 创建连接kafka服务器参数val kafkaParam = Map(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.56.101:9092",// 选定一组消费者去读数据,ConsumerConfig.GROUP_ID_CONFIG -> "flumeKafkaStream5",ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "true",ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG -> "20000",// 每组消费者读完会把指针读到末尾无法返回,这个配置将指针放到开头ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])// 创建Direct流, val streams: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("flumeKafkaStream5"), kafkaParam))val kafkaProducer = MySingleBaseDAO.getInstance()
// 非单例代码
//    val kafkaProducer: Broadcast[KafkaSinks[String, String]] = {//      val kafkaProducerConfig = {//        val p = new Properties()
//        p.setProperty("bootstrap.servers", "192.168.56.101:9092")
//        p.setProperty("key.serializer", classOf[StringSerializer].getName)
//        p.setProperty("value.serializer", classOf[StringSerializer].getName)
//        p
//      }
//      if (LOG.isInfoEnabled)
//        LOG.info("kafka producer init done!")
//      ssc.sparkContext.broadcast(KafkaSinks[String, String](kafkaProducerConfig))
//    }streams.foreachRDD(rdd => {val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges// 如果rdd有数据if (!rdd.isEmpty()) {// 代码在将会在每个executor里执行处理数据rdd.map(_.value()).filter(_.split(",").size > 1).flatMap(line => {val ids = line.split(",")ids(1).split(" ").map(word => (ids(0), word))}).foreachPartition(partition => {// 使用广播变量发送到Kafkapartition.foreach(record => {kafkaProducer.value.send("flumeKafkaStream6", new Gson().toJson(record))})})}})ssc.start()ssc.awaitTermination()}}

SparkStreaming 系列(二)kafka与Streaming集成direct流实战---多流集群高并发场景代码演示相关推荐

  1. spark-streaming系列------- 3. Kafka DirectDStream方式数据的接收

    spark-streaming系列------- 3. Kafka DirectDStream方式数据的接收 KafkaRDD分区个数的确定和每个分区数据接收的计算 在KafkUtils.create ...

  2. Kafka之四:Kafka与Streaming集成

    Kafka之四:Kafka与Streaming集成 文章目录 Kafka之四:Kafka与Streaming集成 1. 修改IEDA的maven配置 2. 程序一 3. 程序二:统计次数 4. 提交任 ...

  3. Spring Boot集成Redis缓存之模拟高并发场景处理

    前言 同样我们以上一篇文章为例子,搭建好环境之后,我欧美可以模拟高并发场景下,我们的缓存效率怎么样,到底能不能解决我们实际项目中的缓存问题.也就是如何解决缓存穿透? Spring Boot集成Redi ...

  4. Hadoop集群高可用及zookeeper+kafka组件搭建

    目录 一.Hadoop集群高可用 1.Zookeeper概述 1)Zookeeper基本概述 3)Zab协议 3)observer 4)zookeeper集群图例 2.zookeeper集群搭建 3. ...

  5. java高并发(二十一)高并发场景下缓存常见问题

    缓存一致性 当数据实时性要求很高时,需要保证缓存中的数据与数据库中的数据一致,缓存节点与副本中的数据一致,不能出现差异现象,这就比较依赖缓存的过期和更新策略了.一般会在数据发生更改的时候,主动跟新缓存 ...

  6. 消息中间件→产生原因、JMS与AMQP、主流消息中间件、基本概念、ActiveMQ、集群、实际场景问题解决方案、集成rabbitMQ与kafka

    产生原因 消息中间件 JMS规范与AMQP协议 MQ中间件优劣 中间件基本概念 队列 主题 JMS编码接口关系 ActiveMQ官网 https://activemq.apache.org/ 队列模式 ...

  7. 09_Flink入门案例、word-count程序(java和scala版本)、添加依赖、Flink Streaming和Batch的区别 、在集群上执行程序等

    1.9.Flink入门案例-wordCount 1.9.1.开发工具 1.9.2.编写java版本word-count程序 1.9.2.1.添加Flink Maven依赖 1.9.2.2.编写word ...

  8. Kafka不停机,如何无感知迁移ZooKeeper集群?

    Kafka 在 Yelp 的应用十分广泛,Yelp 每天通过各种集群发送数十亿条消息,在这背后,Kafka 使用 Zookeeper 完成各种分布式协调任务. 因为Yelp 非常依赖 Kafka,那么 ...

  9. k8s和harbor的集成_爱威尔-基于kubernetes集群的项目持续集成(gitlab+harbor+Jenkins)安装...

    这个算是基于kubernetes集群的项目持续集成的前导篇,先把这用环境搭建好我们后面就可以专注做基于k8s的docker化项目持续集成了. gitlab安装 https://about.gitlab ...

最新文章

  1. easyui datagrid 表头 sort 排序
  2. 【❤️算法系列之二叉树的实现(包含前序、中序、后序遍历以及节点的查找和删除)❤️】
  3. ad软件侵权律师函_Aspen Plus 9 软件安装教程
  4. python函数可变参数_python中函数的可变参数
  5. 霍尼韦尔dcs系统组态手册_DCS和PLC有“血缘关系”吗?
  6. 广度优先搜索算法(Breath-first Search)是如何搜索一张图的?
  7. 【2031】求一元三次方程的解
  8. solidworks迈迪设计宝_做非标机械设计必备的辅助工具,如米思米、怡合达、英科宇等...
  9. AIDL解析(一):AIDL原理解析
  10. 浅析泛在电力物联网及国网公司“三型两网”战略
  11. poi excel 插入批注
  12. iOS内购实现及测试Check List
  13. 阿里小程序亮相2019上海云峰会:大生态促成许多“小而美”
  14. java小组口号,小组口号大全
  15. ctfshow XXE web373-web378 wp
  16. 海思Hi3516dv300屏幕调试MIPI TX接口LCD屏幕(京东方JD9366)
  17. 讯飞星火 VS 文心一言:谁是中文大语言模型的TOP1?
  18. win10右键 发送到 选项消失解决办法
  19. ms08_067,ms10_046漏洞复现与利用
  20. 在html中控制自动换行 1

热门文章

  1. 非CI执行Allure2 trends空白问题
  2. 全国计算机等级考试——三级网络技术复习资料
  3. 伺服电机抖动的原因?如何解决?看完你就懂了
  4. 2022年最新西藏建筑八大员(市政)模拟考试题库及答案
  5. 2022年安徽最新建筑八大员(标准员)考试考点及答案
  6. 关于UI设计行业的认识再到认识
  7. GDevelop 5开发经验分享之 Coriander Games 工作室开发游戏的经验分享
  8. esxi搭建truenas
  9. WordPress伪静态规则设置方法
  10. 对两个SQL查询出来的结果进行计算