1.ZookeeperConsumer架构

ZookeeperConsumer类中consumer运行过程架构图:

图1

过程分析:

ConsumerGroupExample类

2.消费者线程(consumer thread),队列,拉取线程(fetch thread)三者之间关系

每一个topic至少需要创建一个consumer thread,如果有多个partitions,则可以创建多个consumer thread线程,consumer thread>==partitions数量,否则会有consumer thread空闲。

部分代码示例如下:

ConsumerConnector consumer

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig());

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put("test-string-topic", new Integer(1));  //value表示consumer thread线程数量

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

具体说明一下三者关系:

(1).topic的partitions分布规则

paritions是安装kafka brokerId有序分配的。

例如现在有三个node安装了kafka broker服务端程序,brokerId分别设置为1,2,3,现在准备一个topic为test-string-topic,并且分配12个partitons,此时partitions的kafka broker节点分布情况为 ,partitions索引编号为0,3,6,9等4个partitions在brokerId=1上,1,4,7,10在brokerId=2上,2,5,8,11在brokerId=3上。

创建consumer thread

consumer thread数量与BlockingQueue一一对应。

a.当consumer thread count=1时

此时有一个blockingQueue1,三个fetch thread线程,该topic分布在几个node上就有几个fetch thread,每个fetch thread会于kafka broker建立一个连接。3个fetch thread线程去拉取消息数据,最终放到blockingQueue1中,等待consumer thread来消费。

消费者线程,缓冲队列,partitions分布列表如下

consumer线程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2,3,4,5,6,7,8,9,10,11

fetch thread与partitions分布列表如下

fetch线程

partitions

fetch thread1

0,3,6,9

fetch thread2

1,4,7,10

fetch thread3

2,5,8,11

b. 当consumer thread count=2时

此时有consumerThread1和consumerThread2分别对应2个队列blockingQueue1,blockingQueue2,这2个消费者线程消费partitions依次为:0,1,2,3,4,5与6,7,8,9,10,11;消费者线程,缓冲队列,partitions分布列表如下

consumer线程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2,3,4,5

consumer thread2

blockingQueue2

6,7,8,9,10,11

fetch thread与partitions分布列表如下

fetch线程

partitions

fetch thread1

0,3,6,9

fetch thread2

1,4,7,10

fetch thread3

2,5,8,11

c. 当consumer thread count=4时

消费者线程,缓冲队列,partitions分布列表如下

consumer线程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2

consumer thread2

blockingQueue2

3,4,5

consumer thread3

blockingQueue3

6,7,8

consumer thread4

blockingQueue4

9,10,11

fetch thread与partitions分布列表如下

同上

同理当消费线程consumer thread count=n,都是安装上述分布规则来处理的。

3.consumer消息线程以及队列创建逻辑

运用ZookeeperConsumerConnector类创建多线程并行消费测试类,ConsumerGroupExample类初始化,调用createMessageStreams方法,实际是在consume方法处理的逻辑,创建KafkaStream,以及阻塞队列(LinkedBlockingQueue),KafkaStream与队列个数一一对应,消费者线程数量决定阻塞队列的个数。

registerConsumerInZK()方法:设置消费者组,注册消费者信息consumerIdString到zookeeper上。

consumerIdString产生规则部分代码如下:

[java] view plain copy
  1. String consumerUuid = null;
  2. if(config.consumerId!=null && config.consumerId)
  3. consumerUuid = consumerId;
  4. else {
  5. String uuid = UUID.randomUUID()
  6. consumerUuid = "%s-%d-%s".format(
  7. InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
  8. uuid.getMostSignificantBits().toHexString.substring(0,8));
  9. }
  10. String consumerIdString =  config.groupId + "_" + consumerUuid;

kafka zookeeper注册模型结构或存储结构如下:

kafka在zookeeper中存储结构

说明:目前把kafka中绝大部分存储模型都列表出来了,当前还有少量不常使用的,暂时还没有列举,后续会加上。

consumer初始化逻辑处理:

1.实例化并注册loadBalancerListener监听,ZKRebalancerListener监听consumerIdString状态变化

触发consumer reblance条件如下几个:

ZKRebalancerListener:当/kafka01/consumer/[consumer-group]/ids子节点变化时,会触发

ZKTopicPartitionChangeListener:当该topic的partitions发生变化时,会触发。

val topicPath = "/kafka01/brokers/topics" + "/" + "topic-1"
      zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)

原文:http://blog.csdn.net/lizhitao/article/details/38458631

kafka Consumer详解相关推荐

  1. Kafka Consumer 详解

    文章目录 消费者和消费者组 分区再均衡 创建消费者 自动提交偏移量 自动提交 offset 手动提交偏移量 同步提交 异步提交 消费者其他属性 消费者和消费者组 在 Kafka 中,消费者通常是消费者 ...

  2. Kafka 原理详解

    Kafka 原理详解 1 kakfa基础概念说明 Broker:消息服务器,就是我们部署的一个kafka服务 Partition:消息的水平分区,一个Topic可以有多个分区,这样实现了消息的无限量存 ...

  3. Kafka配置详解-Consumer配置

    转载自:http://orchome.com/535 3.4 kafka消费者配置 在0.9.0.0中,我们引入了新的Java消费者来替代早期基于Scala的简单和高级消费者.新老客户端的配置如下. ...

  4. python使用kafka原理详解_Python操作Kafka原理及使用详解

    Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...

  5. python使用kafka原理详解真实完整版_转:Kafka史上最详细原理总结 ----看完绝对不后悔...

    消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一.下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果. 1.1  K ...

  6. python使用kafka原理详解真实完整版_史上最详细Kafka原理总结

    Kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实 ...

  7. kafka 消费者详解

    前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用 kafka consumer 常用的 kafka consumer 配置 消费者 和 消费者组 什么是消费者? ...

  8. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

  9. Kafka 安装详解

    注意:确保有JDK1.8版本及以上 官方文档:https://kafka.apache.org/quickstart 清华镜像下载:https://mirrors.tuna.tsinghua.edu. ...

最新文章

  1. python项目主界面_python项目案例
  2. 106. 动态中位数【经典 / 对顶堆】
  3. 云炬Android开发报错处理教程 解决Android Studio kotlin等依赖下载慢,下载超时失败的问题
  4. entity framework5 sqlserver2005 事务(TransactionScope)报未启用MSDTC错误解决办法
  5. oracle数据缓冲区作用,Oracle内存结构(02)--缓冲区
  6. C#LeetCode刷题之#88-合并两个有序数组(Merge Sorted Array)
  7. vue保存页面的值_vue前端页面跳转参数传递及存储
  8. gispython定义查询_Python与开源GIS:SpatiaLite 中的基本SQL数据库查询用法
  9. Pytorch 尝试通过强化cpu使用加快训练和推理速度(二)
  10. phpstorm 2019.1 mac
  11. 周志华《机器学习》读书笔记与习题答案(持续更新)
  12. sybase 事务 超时返回_分布式事务设计与实践-消息最终一致性
  13. java watir_watir webdriver 安装
  14. 印刷电路板(PCB)基础
  15. 科技爱好者周刊:第 75 期
  16. 德国质量链接中国速度,奥迪一汽新能源汽车有限公司在电动化赛道上全速奔跑
  17. java编译(打包)完成,导致Excel文件损坏的问题
  18. PSI成长之路_配置并成功运行
  19. textfield监听输入汉字数量
  20. android手机应用程序开发,Android手机应用程序开发标准

热门文章

  1. Backdrop Filter
  2. Android高级-贝塞尔曲线与计算规则
  3. 荣耀FlyPods3体验 优秀的方式不仅一种
  4. html5实现边框为圆圈,CSS怎样做出自适应圆形边框?
  5. Android 仿今日头条评论时键盘自动弹出的效果
  6. 【SPSS】二项分布检验详细操作教程(附案例实战)
  7. HTML全角空格语法
  8. python图片裁剪_Python图片裁剪(如: 头像裁剪)
  9. Java: ParameterizedType用法与简介
  10. 行业的英语术语大全之化妆品