Apache Kafka + Spark Streaming Integration
1.目标
为了构建实时应用程序,Apache Kafka - Spark Streaming Integration是最佳组合。因此,在本文中,我们将详细了解Kafka中Spark Streaming Integration的整个概念。此外,我们将看看Spark Streaming-Kafka示例。在此之后,我们将讨论基于接收器的方法和Kafka Spark Streaming Integration的直接方法。此外,我们将在Kafka Spark Streaming Integration中看到直接接近基于接收器的方法的优势。
那么,让我们开始Kafka Spark Streaming Integration
Apache Kafka Spark Streaming Integration
你在卡夫卡有多好
2.什么是Kafka Spark Streaming Integration?
在Apache Kafka Spark Streaming Integration中,有两种方法可以配置Spark Streaming以从Kafka接收数据,即Kafka Spark Streaming Integration。首先是使用Receivers和Kafka的高级API,第二种以及新方法是不使用Receiver。这两种方法都有不同的编程模型,例如性能特征和语义保证。
什么是Kafka-Spark Streaming Integration
让我们详细研究这两种方法。
一个。基于接收者的方法
在这里,我们使用Receiver接收数据。因此,通过使用Kafka高级消费者 API,我们实现了Receiver。此外,接收的数据存储在Spark执行程序中。然后由Kafka发起的作业 - Spark Streaming处理数据。
尽管如此,这种方法可能会在默认配置下丢失故障数据。因此,我们必须在Kafka Spark Streaming中另外启用预写日志,以确保零数据丢失。这会将所有收到的Kafka数据同步保存到分布式文件系统上的预写日志中。通过这种方式,可以在故障时恢复所有数据。
Apache Kafka工作流程| Kafka Pub-Sub Messaging
此外,我们将讨论如何在我们的Kafka Spark Streaming应用程序中使用这种基于Receiver的方法。
一世。 链接
现在,使用SBT / Maven项目定义为Scala / Java应用程序链接您的Kafka流应用程序与以下工件。
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.2.0
但是,在为Python应用程序部署应用程序时,我们必须添加上面的库及其依赖项。
II。 程序设计
然后,通过在流应用程序代码中导入KafkaUtils来创建输入DStream:
import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
- import org.apache.spark.streaming.kafka._
- val kafkaStream = KafkaUtils。createStream (streamingContext,
- [ ZK quorum ] ,[ 消费者群组ID ] ,[ 消费的Kafka分区的每个主题数量] )
此外,使用createStream的变体,我们可以指定键和值类及其相应的解码器类。
III。 部署
与任何Spark应用程序一样,spark-submit用于启动您的应用程序。但是,Scala / Java应用程序和Python应用程序的细节略有不同。
详细了解Spark用例
此外,对于缺乏SBT / Maven项目管理的Python应用程序,使用-packages spark-streaming-Kafka-0-8_2.11及其依赖项可以直接添加到spark-submit。
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...
此外,我们还可以从Maven存储库下载Maven工件spark-streaming-Kafka-0-8-assembly的JAR。然后使用-jars将其添加到spark-submit。
湾 直接方法(无接收器)
在基于接收器的方法之后,引入了新的无接收器“直接”方法。它确保了更强大的端到端保证。此方法定期向Kafka查询每个主题+分区中的最新偏移量,而不是使用接收器来接收数据。此外,相应地定义要在每个批次中处理的偏移范围。此外,为了从Kafka读取定义的偏移范围,使用简单的消费者API,尤其是在启动处理数据的作业时。但是,它类似于从文件系统读取文件。
注意:此功能是在Spark 1.3中为Scala和Java API引入的,在Spark 1.4中为Python API引入。
现在,让我们讨论如何在流应用程序中使用此方法。
要了解有关Consumer API的更多信息,请访问以下链接:
Apache Kafka Consumer | 卡夫卡消费者的例子
一世。链接
但是,此方法仅在Scala / Java应用程序中受支持。使用以下工件,链接SBT / Maven项目。
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.2.0
II。程序设计
此外,在流应用程序代码中导入KafkaUtils并创建输入DStream:
import org.apache.spark.streaming.kafka._ val directKafkaStream = KafkaUtils.createDirectStream[[key class], [value class], [key decoder class], [value decoder class] ](streamingContext, [map of Kafka parameters], [set of topics to consume])
- import org.apache.spark.streaming.kafka._
- val directKafkaStream = KafkaUtils.createDirectStream [
- [ key class ] ,[ value class ] ,[ key decoder class ] ,[ value decoder class ] ] (
- streamingContext,[ 卡夫卡参数的地图] ,[ 要消费的主题集] )
我们必须在Kafka参数中指定metadata.broker.list或bootstrap.servers。因此,默认情况下,它将从每个Kafka分区的最新偏移开始消耗。但是,如果将Kafka参数中的配置auto.offset.reset设置为最小,它将从最小偏移开始消耗。
此外,使用KafkaUtils.createDirectStream的其他变体,我们可以从任意偏移开始消耗。然后,执行以下操作以访问每批中消耗的Kafka偏移量。
- //保持对当前偏移范围的引用,以便下游可以使用它
// Hold a reference to the current offset ranges, so downstream can use it var offsetRanges = Array.empty[OffsetRange] directKafkaStream.transform { rdd =>offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd }.map {... }.foreachRDD { rdd =>for (o <- offsetRanges) {println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")}... }
如果我们希望基于Zookeeper的Kafka监控工具显示流应用程序的进度,我们可以使用它来更新Zookeeper自己。
阅读前五篇Apache Kafka书籍| 学习卡夫卡的完整指南
III。部署
这里,部署过程类似于部署基于Receiver的方法的过程。
3.直接方法的优点
与Kafka的Spark Streaming集成相比,第二种方法优于第一种方法:
直接方法在与Kafka的Spark Streaming集成中的优势
一个。简化的并行性
不需要创建多个输入Kafka流并将它们联合起来。但是,Kafka - Spark Streaming将使用直接流创建与要使用的Kafka分区一样多的RDD分区。这将同时从Kafka读取数据。因此,我们可以说,它是Kafka和RDD分区之间的一对一映射,更容易理解和调整。
湾 效率
在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这进一步复制了数据。这实际上是低效的,因为数据有效地被复制两次 - 一次由Kafka复制,第二次由预写日志复制。第二种方法消除了问题,因为没有接收器,因此不需要预写日志。只要我们有足够的Kafka保留,就可以从Kafka恢复消息。
C。完全一次的语义
基本上,我们使用Kafka的高级API在第一种方法中在Zookeeper中存储消耗的偏移量。但是,要消费来自Kafka的数据,这是一种传统方式。即使它可以确保零数据丢失,但在某些故障情况下,某些记录可能会被消耗两次。这是由于Kafka可靠接收的数据之间的不一致 - Spark Streaming和Zookeeper跟踪的偏移。因此,在第二种方法中,我们使用了一个不使用Zookeeper的简单Kafka API。
让我们修改Apache Kafka架构及其基本概念
因此,尽管出现故障,但Spark Streaming有效地接收了每条记录一次。因此,请确保将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移的原子事务。这有助于为我们的结果输出实现完全一次的语义。
虽然,还有一个缺点,即它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监视工具不会显示进度。但是,我们仍然可以在每个批次中访问此方法处理的偏移量,并自行更新Zookeeper。
所以,这就是Apache Kafka Spark Streaming Integration。希望你喜欢我们的解释。
Apache Spark中的Spark Streaming Checkpoint
4。结论
因此,在这个Kafka-Spark Streaming Integration中,我们已经详细了解了Spark Kafka与Spark Kafka集成的整个概念。此外,我们讨论了Kafka Spark Streaming配置的两种不同方法,即接收方法和直接方法
转载于:https://www.cnblogs.com/a00ium/p/10852964.html
Apache Kafka + Spark Streaming Integration相关推荐
- Apache Kafka / Spark流系统的性能调优
电信行业的实际案例研究 调试实际的分布式应用程序可能是一项艰巨的任务. 至少在一开始,最常见的Google搜索并没有什么用. 在这篇博客文章中,我将详细介绍如何将Apache Kafka / Spar ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0
如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装
一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120master192.168.0.121slave1192.168.0.122 slave2 ...
- Flume+Kafka+Spark Streaming+MySQL实时日志分析
文章目录 项目背景 案例需求 一.分析 1.日志分析 二.日志采集 第一步.代码编辑 2.启动采集代码 三.编写Spark Streaming的代码 第一步 创建工程 第二步 选择创建Scala工程 ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...
Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...
- Kafka+Spark Streaming如何保证exactly once语义
大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 在Kafka.Storm.Flink.Spark Streaming等分布式流处理系统中(没错,Ka ...
- 大数据Spark “蘑菇云”行动第76课: Kafka+Spark Streaming+Redis项目实战
大数据Spark "蘑菇云"行动第76课: Kafka+Spark Streaming+Redis项目实战 jedis插件 redis <dependency> ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十八):kafka0.10.1 内置性能测试API用法示例...
消费者测试: ./kafka-consumer-perf-test.sh --zookeeper vm10.60.0.11.com.cn:2181,vm10.60.0.7.com.cn:2181,vm ...
- 【python+flume+kafka+spark streaming】编写word_count入门示例
一. 整体架构的一些理解 1.整体架构的理解: 架构中的角色分为了数据采集,数据缓冲,还有数据处理. flume由于输入和输出的接口众多,于是利用这特点来实现无编程的数据采集. 无编程的数据采集,我是 ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十九)ES6.2.2 安装Ik中文分词器
注: elasticsearch 版本6.2.2 1)集群模式,则每个节点都需要安装ik分词,安装插件完毕后需要重启服务,创建mapping前如果有机器未安装分词,则可能该索引可能为RED,需要删除后 ...
最新文章
- mysql shharding_mysql 技术内幕 的示例数据库
- c语言关键字_C语言初学者必须掌握的关键字!
- 论文浅尝 | Understanding Black-box Predictions via Influence Func
- 一行 Python 代码能干什么?有意思!
- Java 中如何实现保留两位小数 — DecimalFormat
- Java序谈之通讯录制作
- 小米手机测试代码电池测试代码
- TextRank算法原理简析、代码实现
- 斑马打印机ZPL语言和EPL语言的区别是什么
- 小鸟云服务器上线了新的操作系统
- 仿微信九宫格群聊view
- 【好数推荐】数据堂平均音色语音库
- 四川大学计算机专业调剂,2020年四川大学计算机学院(软件学院)考研调剂信息...
- 页面级优化——icon图标显示方式
- jQuery插件-支持天干地支阴历阳历万年历节假日红字显示记事等功能的日历插件(1)...
- Paperreading之五 Stacked Hourglass Networks(SHN)和源码阅读(PyTorch版本)
- 【urllib的使用(下)】
- 一周一书一文(20160814):让未来现在就来——彭小六
- 计算机中的三类总线是什么,计算机的三类总线分别是什么?
- Revit2016 外部工具添加 ---win10
热门文章
- 基于springboot+vue的智慧教室预约系统(前后端分离)
- linux下进程号,Linux下C++获取进程号
- html css菜鸟,CSS菜鸟教程阅读笔记
- spring boot Junit5单元测试
- Centos7安装 mariadb 最新版
- ubuntu server 14.04/16.x 开启 root ssh 登录
- Linux 初级命令
- rmd文件怎么转换html文件,如何将Rmd文件的html输出向左对齐
- 小D课堂 - 新版本微服务springcloud+Docker教程_4-05 微服务调用方式之feign 实战 订单调用商品服务...
- 阶段1 语言基础+高级_1-3-Java语言高级_05-异常与多线程_第6节 Lambda表达式_6_Lambda表达式有参数有返回值的...