Producer消息发送

producer.send(msg); // 用类似这样的方式去发送消息,就会把消息给你均匀的分布到各个分区上去
producer.send(key, msg); // 订单id,或者是用户id,他会根据这个key的hash值去分发到某个分区上去,他可以保证相同的key会路由分发到同一个分区上去。
每次发送消息都必须先把数据封装成一个ProducerRecord对象,里面包含了要发送的topic,具体在哪个分区,分区key,消息内容,timestamp时间戳,然后这个对象交给序列化器,变成自定义协议格式的数据,接着把数据交给partitioner分区器,对这个数据选择合适的分区,默认就轮询所有分区,或者根据key来hash路由到某个分区,这个topic的分区信息,都是在客户端会有缓存的,当然会提前跟broker去获取。接着这个数据会被发送到producer内部的一块缓冲区里,然后producer内部有一个Sender线程,会从缓冲区里提取消息封装成一个一个的batch,然后每个batch发送给分区的leader副本所在的broker。

常见异常处理
常见的异常如下:

1)LeaderNotAvailableException:某台机器挂了,此时leader副本不可用,会导致你写入失败,要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可。
2)NotControllerException:这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可
3)NetworkException:网络异常,重试即可
参数:retries 默认值是3
参数:retry.backoff.ms 两次重试之间的时间间隔

提升消息吞吐量

1)buffer.memory:设置发送消息的缓冲区,默认值是33554432,就是32MB
如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住

   Long startTime=System.currentTime();

producer.send(record, new Callback() {

     @Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null) {// 消息发送成功System.out.println("消息发送成功");  } else {// 消息发送失败,需要重新发送}}});Long endTime=System.currentTime();If(endTime - startTime > 100){//说明内存被压满了说明有问题

}

2)compression.type,默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销
3)batch.size,设置meigebatch的大小,如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里
默认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,可以自己压测一下
4)linger.ms,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的,一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去。但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。

请求超时
1)max.request.size:这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1mb,这个一般太小了,很多消息可能都会超过1mb的大小,所以需要自己优化调整,把他设置更大一些(企业一般设置成10M)
2)request.timeout.ms:这个就是说发送一个请求出去之后,他有一个超时的时间限制,默认是30秒,如果30秒都收不到响应,那么就会认为异常,会抛出一个TimeoutException来让我们进行处理

ACK参数
acks参数,其实是控制发送出去的消息的持久化机制的
1)如果acks=0,那么producer根本不管写入broker的消息到底成功没有,发送一条消息出去,立马就可以发送下一条消息,这是吞吐量最高的方式,但是可能消息都丢失了,你也不知道的,但是说实话,你如果真是那种实时数据流分析的业务和场景,就是仅仅分析一些数据报表,丢几条数据影响不大的。会让你的发送吞吐量会提升很多,你发送弄一个batch出,不需要等待人家leader写成功,直接就可以发送下一个batch了,吞吐量很大的,哪怕是偶尔丢一点点数据,实时报表,折线图,饼图。

2)acks=all,或者acks=-1:这个leader写入成功以后,必须等待其他ISR中的副本都写入成功,才可以返回响应说这条消息写入成功了,此时你会收到一个回调通知

3)acks=1:只要leader写入成功,就认为消息成功了,默认给这个其实就比较合适的,还是可能会导致数据丢失的,如果刚写入leader,leader就挂了,此时数据必然丢了,其他的follower没收到数据副本,变成leader

如果要想保证数据不丢失,得如下设置:
a)min.insync.replicas = 2,ISR里必须有2个副本,一个leader和一个follower,最最起码的一个,不能只有一个leader存活,连一个follower都没有了

b)acks = -1,每次写成功一定是leader和follower都成功才可以算做成功,leader挂了,follower上是一定有这条数据,不会丢失

c) retries = Integer.MAX_VALUE,无限重试,如果上述两个条件不满足,写入一直失败,就会无限次重试,保证说数据必须成功的发送给两个副本,如果做不到,就不停的重试,除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失

重试乱序
消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了,你现在收到回调失败了才在重试,此时消息就会乱序,所以可以使用“max.in.flight.requests.per.connection”参数设置为1,这样可以保证producer同一时间只能发送一条消息

Consumer架构

Offset管理

每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。所以后来就是提交offset发送给内部topic:__consumer_offsets,提交过去的时候,key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact。也就是每个group.id+topic+分区号就保留最新的那条数据即可。而且因为这个 __consumer_offsets可能会接收高并发的请求,所以默认分区50个,这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力,就好很多。

Coordinator

Coordinator的作用
每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance,
根据内部的一个选择机制,会挑选一个对应的Broker,Kafka总会把你的各个消费组均匀分配给各个Broker作为coordinator来进行管理的,consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给你的这个consumer来进行消费。coordinator会尽可能均匀的分配分区给各个consumer来消费。
如何选择哪台是coordinator
首先对消费组的groupId进行hash,接着对__consumer_offsets的分区数量取模,默认是50,可以通过offsets.topic.num.partitions来设置,找到你的这个consumer group的offset要提交到__consumer_offsets的哪个分区。比如说:groupId,“membership-consumer-group” -> hash值(数字)-> 对50取模 -> 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,找到__consumer_offsets的一个分区,__consumer_offset的分区的副本数量默认来说1,只有一个leader,然后对这个分区找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,consumer接着就会维护一个Socket连接跟这个Broker进行通信。

Rebalance策略

比如我们消费的一个主题有12个分区:
p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
假设我们的消费者组里面有三个消费者
1.range策略
range策略就是按照partiton的序号范围
p0~3 consumer1
p4~7 consumer2
p8~11 consumer3
默认就是这个策略;

2.round-robin策略
consumer1:0,3,6,9
consumer2:1,4,7,10
consumer3:2,5,8,11

但是前面的这两个方案有个问题:
假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3
这样的话,原本在consumer2上的的p6,p7分区就被分配到了 consumer3上。

3.sticky策略
最新的一个sticky策略,就是说尽可能保证在rebalance的时候,让原本属于这个consumer
的分区还是属于他们,
然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略

consumer1:0-3
consumer2: 4-7
consumer3: 8-11
假设consumer3挂了
consumer1:0-3,+8,9
consumer2: 4-7,+10,11

Rebalance分代机制
在rebalance的时候,可能你本来消费了partition3的数据,结果有些数据消费了还没提交offset,结果此时rebalance,把partition3分配给了另外一个cnosumer了,此时你如果提交partition3的数据的offset,能行吗?必然不行,所以每次rebalance会触发一次consumer group generation,分代,每次分代会加1,然后你提交上一个分代的offset是不行的,那个partiton可能已经不属于你了,大家全部按照新的partiton分配方案重新消费数据。

Consumer核心参数

【heartbeat.interval.ms】
consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作

【session.timeout.ms】
kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒

【max.poll.interval.ms】
如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了

【fetch.max.bytes】
获取一条消息最大的字节数,一般建议设置大一些

【max.poll.records】
一次poll返回消息的最大条数,默认是500条

【connection.max.idle.ms】
consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收

【auto.offset.reset】
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
topica -> partition0:1000
partitino1:2000
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从当前位置开始消费
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

【enable.auto.commit】
这个就是开启自动提交唯一

【auto.commit.ineterval.ms】
这个指的是多久条件一次偏移量

Kafka一些参数配置相关推荐

  1. kafka config参数配置简要说明

    #kafka参数配置 #等待所有的replicas都响应后才返回响应,可靠性最高,但会降低吞吐率 acks=all #发生Retriable exceptions可重试异常时,重试发送消息次数,最大为 ...

  2. kafka生产者参数配置,灵魂拷问

    网站基础知识 (网站架构及其演变过程+常见协议和标准+DNS的设置+Java中Socket的用法+HTTP协议+详解Servlet+Tomcat分析) 俯视 Spring MVC (SpringMVC ...

  3. 五面拿下阿里飞猪offer,kafka生产者参数配置

    Netty实战 无论是想要学习Spring 5 .Spark.Cassandra等这样的系统,还是通过学习Netty来构建自己的基于Java的高性能网络框架,或者是更加具体的高性能Web或者游戏服务器 ...

  4. CC00038.kafka——|Hadoopkafka.V23|——|kafka.v23|消费者拦截器参数配置|

    一.消费者拦截器参数配置:消费者参数配置补充 配置项 说明 bootstrap.servers 建立到Kafka集群的初始连接用到的host/port列表. 客户端会使用这里指定的所有的host/po ...

  5. kafka java jvm 优化_kafka优化–JVM参数配置优化

    主要是启动脚本和log4j基本参数的设置和优化,这些参数藏的比较深. 1.JVM参数配置优化 如果使用的CMS GC算法,建议JVM Heap不要太大,在4GB以内就可以.JVM太大,导致Major ...

  6. kafka集群参数配置(下)

    今天我们继续来聊那些重要的 Kafka 集群配置,下半部分主要是 Topic 级别参数.JVM 参数以及操作系统参数的设置. 在上一期中,我们讨论了 Broker 端参数设置的一些法则,但其实 Kaf ...

  7. kafka server broker参数配置

    kafka broker的server配置路径为 $ZOOKEEPER_HOME/config/server.properties中,可根据需要进行配置 kafka broker 核心配置 #brok ...

  8. ELK5.3+Kafka集群配置

    [一]资源准备 # 3台4C*8G, 安装Zookeeper.Kafka.Logstash--Broker(input: filebeat; output: Kafka) 10.101.2.23 10 ...

  9. Spark性能调优系列:Spark参数配置大全(官网资料)

    Spark参数配置大全 Spark提供了三个位置来配置系统 Spark属性控制大多数应用程序参数,可以使用SparkConf对象或Java系统属性来设置. 通过conf/spark-env.sh每个节 ...

最新文章

  1. python画-使用python画个小猪佩奇的示例代码
  2. NFS、SSH、SAMBA
  3. Ext JS 4 Beta 3 今天可以下载了
  4. 杂项-权限管理:RBAC
  5. mysql 复制延迟诊断_新特性解读 | MySQL 8 复制延迟观测新方式,更全面更精准
  6. P2421 A-B数对(增强版)
  7. Redis学习总结(17)——Redis 持久化和过期机制复习
  8. unity图片模糊处理
  9. 记录repast4py环境配置
  10. 【人工智能】人工智能是20世纪50年代中期兴起的一门新兴边缘科学
  11. Mac怎么读写NTFS格式?Mac电脑重新安装NTFS卷
  12. 2020抖音最新上热门技巧你知道多少?
  13. 学习笔记:使用requests+Beautiful4爬取优美图库
  14. NumPy 学习 第三篇:矢量化和广播
  15. 大学想选择计算机专业,零基础如何快速学习编程?都是经验之谈
  16. MarkDown-如何插入上划线,下划线,中划线汇总
  17. HTML5作业:美食网站设计(浮动的使用)
  18. Very Deep Convolutional Networks for Large-Scale Image Recognition—VGG论文翻译—中文版
  19. 30天入门 Android 开发, Google 与你一起圆梦
  20. Jenkins的下载安装及入门使用

热门文章

  1. CListControl的OnMouseMove和OnNcHitTest
  2. 打印纸张尺寸换算_纸张尺寸与开(K)数换算
  3. python骨灰技巧_Pandas技巧,某骨灰级Pythoner经验总结,呕心沥血!
  4. 用ajax传值input file,获取 input type=file 标签的内容,并使用ajax进行请求到服务器...
  5. 印刷体是什么意思_家长晒出4年级小学霸课前笔记,字迹堪比“印刷体”,老师都羡慕...
  6. linux摄像头内核驱动开发,怎么在Linux下开发摄像头驱动
  7. 浏览器无法运行java_window_浏览器无法运行JAVA脚本的解决方法,1、浏览器无法运行JAVA脚本的 - phpStudy...
  8. java如何画百分比圆环_canvas绘制百分比圆环进度条
  9. java代码中 作用_Java利用开发中代码生成工具的作用
  10. php添加填空,PHP之preg_replace_callback(),将填空题的[[]]替换成______