一、kafka是什么?
目前市面上消息队列有很多,比如activeMQ、rabbitMQ、rocketMQ等等,性能和功能性方面各有优劣,而kafka呢,说它是消息队列,但其实和传统的消息队列不太一样,首先它没有遵守JMS(java message service)规范,也不完全像传统的mq那种架构,它是分布式的发布-订阅模式的架构。以集群的方式运行,可持久化消息,流式处理数据。然后,它与传统的MQ相比,最大的优势是吞吐量很高,主要得益于kafka会把消息存放在文件中,响应时直接将消息从文件系统中发送到网络通道中,避免了字节复制。不过与传统的MQ相比,kafka并没有那么高的消息传递可靠性,比如有可能会发生重复发送消息、丢失消息或者消息乱序等等,下文中会提及如何配置broker、producer及consumer来尽可能的保证消息的可靠性。
二:名词介绍
producer:生产者,发送消息的一方

broker:kafka的节点,多个broker可以组成kafka集群,对生产者的工作内容包括接收消息,设置offset,提交消息到磁盘持久化。对消费者的工作内容包括响应消费者的请求,读取磁盘上的消息返回给消费者。(broker中还有一个controller的角色,是一开始所有broker去zookeeper上争抢注册controller临时节点时产生的,负责分配分区给其他broker以及监控其他broker)

zookeeper:提供给broker注册的功能,将集群中的broker管理起来,0.10.0版本之前会保存消费者的offset等元数据,0.10.0版本开始这些都由kafka保存(kafka由名称为__consumer_offsets的topic保存)了,不再依赖zookeeper。

offset:偏移量,不断递增的整数,在创建消息时,broker将其添加到分区内,每个消息的offset是唯一的,消费者会把每个分区最后读取的消息的offset存到zookeeper或kafka上(版本不同导致),这样保证consumer关闭或重启,读取状态也不会消失。

partition:分区,物理上将消息进行分类,kafka通过分区来实现数据冗余和伸缩性。一个分区就是一个提交日志,同一个分区内的消息是有序的,但topic包含多个partition,所以不一定是有序的了(partition还有一个leader角色,他是broker指定的,同一个topic下的其他partiton都为replica,leader负责读写消息,其他replica负责从leader中同步消息)。

consumer:消息的消费者,也需要连接zookeeper

topic:主题,逻辑上将消息进行分类,包含一个或多个分区,可横跨多个broker。

consumerGroup:消费者群组,同一群组内的消费者,只能接收到一次订阅的topic的消息,群组保证每个分区只能被一个消费者使用。

kafka的多集群:多数据中心(灾备),kafka的复制机制只能在单集群中进行,多集群改如何?kafka提供了一个MirrorMaker的工具,可以完成多集群间的kafka消息同步

为啥选择kafka:
1.支持多生产者、多消费者,并且消费者之前互不影响,只要消费者组成一个群组,他们就能共享一个消息,并且群组内只消费一次。

2.基于磁盘数据存储,允许消费者非实时的读取消息,并且可以根据大小、时间等规则设置保留消息的时长。

3.伸缩性,集群中可以在不影响可用性的前提下,扩展或减少broker的数量,要想提高集群的容错能力,需要配置较高的复制系数。

4.解耦、削峰

三、安装kafka
先安装java,再安装zookeeper(节点建议是奇数,且不用太大,不然影响性能,建议7个),最后安装kafka,配置broker.id、zookeeper.connect、log.dirs。

配置topic的参数:
1.num.partitons:topic下的分区数量,默认为1,为了能让分区分不到所有broker上,分区数量要大于broker的数量,是topic的扩容手段。
如何选定分区数量?
1.估算topic的吞吐量,每秒100k还是1G?
2.消费者的吞吐量?
3.broker的磁盘空间和网络带宽
估算出1和2的值,相除即可。
2.log.retention.ms:日志被保留多久,默认是168小时,还有另外两个相似的参数,设置后,kafka会选择最小的为准。
3.log.retention.byte:日志多大时被删除,注意是每一个partition的日志,如果topic有很多partition的话,日志累计容量会很大。
4.log.segment.byte:当前日志多大时,会被关闭,默认是1G。如果消息量不大时,如何调整就很重要,比如100M每天,这样就是10天才能填满,再加上log.retention.ms的7天,那就是保留17天。
5.message.max.bytes:限制单个消息的大小,默认1M,不能设置太大,因为太大的话,负责处理网路连接和请求的线程就会花费很多时间处理,并且会增加磁盘写入块的大小,影响IO。

安装kafka硬件的选择:
从磁盘吞吐量(影响生产者)和容量(取决于保留消息的时间及大小)、内存(影响消费者,因为读取的消息会保存在页面缓存中)、网络、CPU考虑。

垃圾回收器的选择:
最好选择G1,因为其会根据工作负载情况进行自我调节,而且它的停顿时间是固定的。但是kafka默认启动时的回收期是Parallel New和CMS,需要手动设置一下。

四、生产者详解
producer生成消息的格式:topoc、【partition】、【key】、value。
消息发送到broker后,先经过 序列化器 将消息的key和value进行序列化,转成字节数组,然后经过 分区器 将消息发送到指定的partition。(这时如果消息的partition不为空的话,则会被分配到指定的partition上)
producer的同步发送与异步发送:
1.等待broker响应
2.不需要等待broker响应
序列化器:可以自定义序列化器(继承一个kafka的Serializer接口,实现它的serialize方法),也可以使用kafka提供的序列化器,比如intSerializer或者StringSerializer,或者使用apache Avro进行序列化。
apache Avro流程:producer序列化时需要去schema注册表中获取当前schema,消费者反序列化时也需要到schema注册表中获取schema才能进行反序列化。
代码实现:props.put(“schema.registry.url”,schemaUrl);
分区器:根据不同的业务规则或场景,自定义分区器(继承一个kafka的接口,实现它的方法)。如果不自定义分区器的话,为key生成hash值,映射到指定的分区上,保证同一个键在同一个分区上,broker会根据round robin(轮询)的方式将消息均匀的分布到各个分区上,最好一开始就把partition的数量规划好,后续不建议修改。
重要参数:
必填的参数只有三个,分别是bootstrp.servers(建议至少提供两个,其中一个宕机,还能够使用另外一个),keySerializer和valueSerializer。
ack:设置producer是否等待所有broker都确认了消息,0代表不等待,1代表只等待leader确认,all代表等待所有 存活的 broker都确认。all使得kafka不会丢失消息
buffer.memory:内存缓冲区的大小.
compression.type:压缩算法,snappy性价比高
retries:重试次数
batch.size:发送统一批次的消息使用多大的内存,生产者并不一定会等到达到这个值了才会把消息都发出去,所以可以设置的大一点。
linger.ms:发送批次之前等待更多消息加入批次的时间,和上一个参数先达到哪个,就发送消息。
max.in.flight.requests.per.connection:设置为1可以保证消息是按顺序写入服务器的。
timeout.ms:等待返回确认消息的时间,和ack共同使用。
max.request.size:发送的单个消息的最大值,最好和消费者能接收的最大消息的值匹配。

五、消费者详解
消费者群组:多个消费者使用同一个group.id后,就会组成一个群组,同一个群组中的消费者订阅同一个主题,每个消费者接受topic中的一部分消息,对消息进行分流(消费者的数量一定要少于topic中partition的数量,因为消费者与partition是一对多的关系,如果超过partition的数量,会导致一部分的消费者无法接受消息),不同群组中的消费者可以多次接受同一个消息。

消费者群组的出现,实现了kafka设计时的主要目标,topic中的消息可以满足企业各种应用场景的需求,只要使用不同的群组即可,并且在群组中可以横向伸缩消费者的数量而不会对性能造成负面影响。
rebalance:再均衡的情况会在以下三种情况中发生:1.新消费者加入群组的时候2.群组中有消费者被关闭或崩溃的时候3.topic发生变化时,比如新加入了partiiton的时候
rebalance很重要,它保证了kafka能够具备高可用性及伸缩性(我们可以放心的添加和减少消费者),不过发生rebalance的时候,群组会有一小段时间不可用,所以应尽量避免rebalance的发生。consumer的心跳:consumer会向群组协调器的broker发送心跳,表示自己是处于存活状态的,发送心跳的情况为:
1.轮询的时候(consumer去broker那里获取消息的时候)
2.提交offset的时候

如果consumer长时间未向broker发送心跳,则被认为已死亡,此时会触发一次rebalance。
心跳机制随着版本变化而发生改变:在0.10.1版本里,kafka新引入了一个独立的心跳线程,将心跳与轮询的动作分开,使发送心跳的频率更加灵活。

consumer分配分区的流程(每次rebalance时都会发生):
1.所有consumer会向 群组协调器 的broker发送joinGroup请求,第一个加入的被选为群主
2.群主从broker中获取当前所有consumer的信息
3.群主使用分配策略进行分区分配
4.将分配结果返回给broker
5.broker将结果通知给群组中其他的consumer

consumer订阅topic:
使用subcribe方法,将topic的名称传入参数,也可以使用正则表达式当做参数,匹配所有符合格式的topic。
使用poll方法,参数是一个超时时间,如果该参数被设置为0,则poll会立刻返回,否则会在指定的毫秒数内等待broker返回数据。
最好使用一个线程运行一个消费者。

轮询:
轮询是消费者API的核心,它包括以下动作:
1.群组协调 2.rebalance 3.发送心跳 4.获取数据

主要参数:
1.fetch.min.bytes:获取消息的最小字节数,,会等到有足够的的消息来达到这个值时才返回给消费者,降低consumer和broker的工作负载,比如当消息不是很活跃的时候,就不需要来来回回的处理数据。
2.fetch.max.wait.ms:指定broker的等待时间才返回数据,默认是500ms,与上一个参数哪个先到达临界值,就会返回消息。
3.max.partition.fetch.bytes:每个分区返回给消费者的最大字节数,默认是1M,它的值必须比broker能够接受的最大消息字节数(max.message.size)大,否则消费者无法消费该消息。
4.session.timeout.ms:consumer死亡前可以与服务器断开连接的时间,默认是3s,如果在该值指定的时间内没有发送心跳,则被认为该consumer已经死亡,导致群组rebalance,如果输入值的比较小,可以更快的检测和恢复节点,不过可能会到导致非预期的rebalance。
5.heartbeat.interval.ms:制定了poll方法想协调器发送心跳的频率,一般要设置的比上一个参数小,比如1s。
6.enable.auto.commit:设置是否自动提交offset,默认是true,不过为了避免出现重复数据和消息丢失,应设置为false。
7.auto.offset.reset:consumer在读取一个没有offset的分区时,应该怎么办,默认是latest,从最新的开始读取,另一个参数是earliest,从最开始的消息开始读取。
8.partition.assignment.strategy:分区分配策略
range策略:若干个连续的分区分配给消费者。
round robin策略:逐个分配给消费者,该策略会把分区均匀的分配给消费者。

提交offset:
consumer向一个叫 _consumer_offset的特殊topic内发送消息,消息内包含每个分区的offset,这样当发生rebalance的时候,新的consumer会读取该topic,从指定的offset中开始处理消息。
提交的方式:
1.自动提交:每过5s,消费者会将poll中接收到的最大偏移量提交上去,但是这样会发生重复读取消息。
2.提交当前offset(同步提交):将enable.aito.commit设置为false,使用commitSync方法提交。
3.异步提交:同步提交有一个缺点,在broker响应之前,会一直阻塞,影响kafka的吞吐量,所以可以将提交改为异步的,使用commitAsync方法,但这样虽然提高了吞吐量,却会出现上一个未收到响应,后面的已经提交成功的情况。
4.同步、异步组合提交:能够确保批次的最大offset被提交
5.提交指定的offset:如果poll返回了很多数据,想要在大量数据中间提交一次offset,就可以使用如下方法,定义一个map,将topic和partition的数据放进去,当做参数传入commitAsync中。

rebalance监听器:在调用subscribe方法时,传入ConsumerRebalacneListener实例就行了,覆盖onPartitionRevoked方法(consumer停止服务消息后至rebalance开始前执行)和onPartitionAssigned方法(重新分配分区之后和consumer再读取消息之前执行)。

从特定的offset开始处理记录:将db的操作与提交偏移量在一个原子操作里完成,流程是处理完记录后,将记录和offset插入数据库中,在失去paitition时,提交事务保存起来,rebalance后,新的consumer从db中读取offset开始处理。

六、深入理解kafka
kafka的复制机制:
kafka通过zookeeper来维护集群成员的信息,每个broker都有一个id,当broker启动时,会创建临时节点把自己的id注册到zookeeper中,如果zookeeper中已经包含了该id,则注册失效,broker加入集群失败。
broker宕机时,该broker创建的临时节点会被zookeeper自动移除,此时如果有与宕机的broker相同id的新节点来注册的话,会拥有与旧的broker相同的分区和topic。
controller的选举:当集群启动时,所有broker都会去zookeeper中创建controller临时节点,但只有一个会创建成功,创建成功的那个broker就会成为controller,同时其他broker会在controller上创建一个watch对象,监听controller的状态,一旦有变化,其他的broker再次去创建controller临时节点,新的controller会有一个新的epoch值,保证其他broker会忽略旧的epoch值,不去处理旧的controller的消息。
controller的作用:为其他broker分配分区,以及监控其他broker的状态,一旦发生宕机,及时处理(新增broker时,重新进行分区分配。broker宕机时,为partition重新选举新的的leader,保证系统的可用性)。

 kafka将自己定义为一个 分布式的、可复制的、可分区的 提交日志服务,复制之所以很关键,原因在于它保证了kafka的可用性与持久性,下面详细说明复制机制的流程:每个partition包含一个leader及若干个replica,为了保证一致性,所有生产者和消费者的请求都会经过leader,它的另一个任务是知道哪些replica与自己是一致的,并维护一个ISR列表。而replica负责同步leader的消息,保持与leader一致的状态(如何与leader保持一致?replica会发送与consumer一样的获取消息的请求,leader会响应包含消息offset的消息),如果leader崩溃了,那么其他的replica中会有一个被提升为leader。如果replica在10s内没有请求最新的消息,就会被认为是不同步的,会从ISR列表中剔除,它就不可能成为新的leader。除了当前leader以外,每个partition分区都有一个首选leader,该leader是在创建topic时选定的,之所以把它叫做首选leader,是因为在创建partition时,需要在broker中均衡leader的分布,所以我们会希望首选leader就是当前leader,配置auto.leader.rebalance为true时,会检查首选leader是不是当前ledaer,如果不是,就会触发选举,让首选leader成为当前leader。

kafka如何处理producer和consumer的请求:
kafka提供了一个二进制协议,制定了请求消息的格式及broker如何对请求做出响应。
元数据请求:响应包括topic所包含的分区,分区有哪些replica,哪个是leader等,相当于路由请求。
生产请求:leader收到该请求后会做一些校验,包括该用户是否有权限,acks值是否有效等,然后broker会将消息写入文件系统缓存中,不确定何时会被刷到磁盘上。
获取请求:kafka通过 零复制 技术返回响应,也就是会将消息直接从文件系统中发送到网络通道中,这项技术避免了字节复制,获得了更好的性能。客户端读取的数据都是被所有ISR成员同步过的数据,不会获取到没有同步的数据,保证了数据的一致性。

由于kakfa不想将offset等元数据通过zookeeper来管理了,所以新增加了几个请求用来保存元数据等信息,比如offsetCommitRequest请求。

kafka的存储细节,包括文件格式和索引:

七、可靠性保证
1.broker的配置:
1.replication.refactor:复制系数。如果将复制系数设为N,那么在N-1个broker失效的情况下(kafka会确保分区的每个副本被分布在不同的broker上),仍然能够提供服务,所以更高的复制系数带来了更高的可用性及可靠性。但是有N个副本后,会占用N倍的磁盘空间,此时需要在可用性及磁盘空间中做出取舍,建议设置为3个。
2.unclean.leader.election:不完全leader选举,默认为true。不允许不同步的副本成为leader,这样设置的好处是当leader不可用时,只会在同步的副本中选出leader,但如果此时没有同步的副本,就会等待原先的leader重新上线,降低了kafka的性能,保证了可靠性。
3.min.insync.replica:最少同步副本。保证消息被min.insync.replica个副本确认后,才被认为是已提交的,要是想提高kafka的可靠性,最好把这个值设置的大一点,比如设置为2,这样的话,最少要有两个副本都同步了这条消息,才会被提交。可是这样的话,如果有两个副本变为不可用了,那么broker就会停止接收生产者的请求,降低了性能。
(根据以上三点,我们可以把参数设置的保险一点,提高kafka的可靠性,但这样是以牺牲kafka的性能为代价的)
2.生产者的配置:
1.ack,前面已经提到过了,有三个值,设置为all的话,会等待所有同步副本确认后,才会收到响应。
2.retries,发送消息失败后的重试次数,这样可以保证消息“至少被保存一次”,但是可能会出现重复发送消息的情况,这时可以在应用程序的消息中添加唯一标识符,保证消息不被重复消费,保证“幂等”性。
3.消费者的配置:
1.enable.auto.commit:设置是否自动提交offset,默认是true,不过为了避免出现重复数据和消息丢失,应设置为false。
2.auto.offset.reset:consumer在读取一个没有offset的分区时,应该怎么办,默认是latest,从最新的开始读取,另一个参数是earliest,从最开始的消息开始读取。

kafka权威指南-总结相关推荐

  1. kafka权威指南_Kafka-分区、片段、偏移量

    [分区.片段.偏移量] 1. 每个分区是由多个Segment组成,当Kafka要写数据到一个partition时,它会写入到状态为active的segment中.如果该segment被写满,则一个新的 ...

  2. 送5本《Kafka权威指南》第二版

    文末送书 科学家们每一次发生分歧都是因为掌握的数据不够充分.所以,我们可以先就获取哪一类数据达成一致,只要获取了数据,问题也就迎刃而解了.要么我是对的,要么你是对的,要么我们都是错的,然后继续. -- ...

  3. 【Kafka】《Kafka权威指南》入门

    发布与订阅消息系统 在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性.数据(消息)的发送者(发布者)不会直接把消息发送给接收 ...

  4. 《Kafka权威指南(第二版)》内容摘要

    下图包含前7章主要内容,主要是业务应用程序开发人员使用Kafka时需要掌握的知识点

  5. 《Kafka权威指南》记录

    生产者 生产流程 32页 生产者创建 Kafka生产者需要三个必须参数:broker地址清单,key和value的序列化方式 (如StringSerializer) 生产者发送 ACKS acks 参 ...

  6. 《Kafka权威指南》读书笔记3 Kafka生产者

    日常业务开发很重要.很常用的一章 提纲:如何使用Kafka生产者:如何创建KafkaProducer.ProducerRecords:如何将记录发给Kafka:如何处理从Kafka返回的错误:一些配置 ...

  7. 《kafka权威指南》学习记录1

    本博客只作为自己学习的一个记录. 一.kafka生产者 1.kafka生产者组件 main线程 send线程 producerrecord对象 序列化器 分区器. Producerrecord对象格式 ...

  8. 《Kafka权威指南》——问题1——onParitionsAssigned

    四.Kafka消费者--从Kafka读取数据 4.8 从特定偏移量处开始处理数据 4.7节中说到,在调用subcribe()方法时传进去一个ConsumerRebalanceListener实例,可以 ...

  9. 《kafka权威指南》之可靠的数据传输

    文章目录 可靠性保证 Kafka做出的四个保证 kafka可靠性保证的核心 kafka的复制机制 不恰当的垃圾回收配置(**) broker配置 复制系数1 不完全的首领选举2 最少同步副本3 可靠的 ...

  10. 如何使用Kafka可靠地发送消息-《Kafka权威指南(第二版)》阅读笔记

    可靠性是系统而不是某个独立组件的一个属性,所以,在讨论Kafka的可靠性保证时,需要从系统的整体出发.说到可靠性,那些与Kafka集成的系统与Kafka本身一样重要.正因为可靠性是系统层面的概念,所以 ...

最新文章

  1. 升维:ValueError: all the input arrays must have same number of dimensions,
  2. 利用EntityFramework获得双色球数据库
  3. jQuery插件开发的基本形式
  4. django之Layui界面点击弹出个对话框并请求逻辑生成分页的动态表格
  5. 一道无限级分类题的 PHP 实现
  6. eclipse android 慢,Android编译很慢(使用Eclipse)
  7. 官方钦定TensorFlow2.0要改这个API,用户吐槽:全世界都是keras
  8. 大数据技术有什么优势
  9. R语言︱R社区的简单解析(CRAN、CRAN Task View)
  10. c语言将输入的字母串转为数字,C语言把字符串转换为数字
  11. 2022年政府工作报告词频分析
  12. 基于Unity的VR迷宫游戏项目技术分享
  13. BZOJ_1022_[SHOI2008]_小约翰的游戏John_(博弈论_反Nim游戏)
  14. 分析网络钓鱼的原理及防御措施
  15. uci数据集中的缺失数据_从uci早期糖尿病风险预测数据集中创建分类器
  16. html蔡徐坤邀请你打篮球源码,蔡徐坤给IKUN“讲笑话”,想邀请粉丝打篮球,网友:你打球像...
  17. 斐讯路由器虚拟服务器怎么设置,路由器端口映射如何设置 端口映射有什么用...
  18. vscode查看变量及函数列表
  19. c语言实验植物与颜色,植物的光合作用曲线比较.doc
  20. Tableau-帕累托图制作

热门文章

  1. Python游戏编程快速上手
  2. TwinCAT3中台达A3伺服使用
  3. 蓝桥杯历届真题题目+解析+代码+答案(2013-2020)(JavaA、B、C组)(C++语言)(Python)
  4. 单片机控制步进电机程序c语言正反转停止,51单片机控制步进电机的启动、停止、正转、反转...
  5. 使用VB6.0编写管家婆服装----百胜服装ERP数据转换程序和通用SQL server查询程序的心得
  6. 机房服务器存放位置要求,服务器机房建设设计要求规范.doc
  7. java定义vip顾客继承顾客_Java初级教频教程 - JavaSE - Java - 私塾在线 - 只做精品视频课程服务...
  8. Service自动被销毁?
  9. 新中大财务软件服务器路径修改,新中大软件最常用的操作手册
  10. 后台审批功能 销售发货单 生成 销售出库单 java NC633 接口开发