问题:今天使用Kafka做一个小DEMO,但运行的时候,可以在IDEA控制台上看到生产者发送的消息,无法在IDEA看到消费者在消费消息,但通过连接Linux在命令行可以看到消费者消费的消息。

生产者应该是没有问题的,给出消费者的代码:

/*** Kafka消费者*/
public class KafkaConsumer extends Thread{private String topic;public KafkaConsumer(String topic) {this.topic = topic;}private ConsumerConnector createConnector(){Properties properties = new Properties();properties.put("zookeeper.connect", KafkaProperties.ZK);properties.put("group.id",KafkaProperties.GROUP_ID);properties.put("rebalance.max.retries", "10");properties.put("rebalance.backoff.ms", "2000");return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));}@Overridepublic void run() {ConsumerConnector consumer = createConnector();Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, 1);//一直就卡在这里,没有办法获取接受到的消息流Map<String, List<KafkaStream<byte[], byte[]>>> messageStream =  consumer.createMessageStreams(topicCountMap);KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0);   ConsumerIterator<byte[], byte[]> iterator = stream.iterator();while (iterator.hasNext()) {String message = new String(iterator.next().message());System.out.println("rec: " + message);}}
}

通过源码跟踪:


进入上面这个方法里面的最后一行


最后的错误定位在上面这个图里面,大致意思是:“不知道注册broker版本号,仅仅支持1和2”
解决办法:通过上面的定位已经指定是哪里错误了,是kafka注册进zookeeper中的这个路径 /broker/ids/0 里面的版本号错误,我登陆zk的客户端,将里面的version更换成1在重试,发现就成功了。
疑问:zk下面的这个目录是第一次启动kafka时候创建的,不知道为什么,我第一次启动时候的version竟然是4,kafka我用的版本是kafka_2.11_0.11.0.0,是我配置还是哪里的问题呀?

Kafka的坑: 消费者无法消费消息相关推荐

  1. Kafka学习(十)--Kafka消费者Consumer消费消息配置实战

    一. Kafka消费者Consumer消费消息配置实战 配置: public static Properties getProperties() {Properties props = new Pro ...

  2. kafka设置起止时间消费消息

    消费kafka消息时,有时可能需要消费某个时间段的消息,写个demo记录下: public class KafkaConsumerByTime {public static void main(Str ...

  3. 利用Kafka发送/消费消息-Java示例

    利用Kafka发送/消费消息-Java示例 当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方. 依赖配置 & ...

  4. kafka_消费者组消费进度监控实现

    对于 Kafka 消费者,最重要的就是监控它们的消费进度,或者说监控它们消费的滞后程度(消费者 Lag 或 Consumer Lag). 所谓滞后程度,就是指消费者当前落后于生产者的程度.Lag 的单 ...

  5. kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)

    文章目录 前言 1. 基础概念 Broker Producer Consumer Consumer Group Topic Partition Replica 2. 命令行操作 2.1 查看所有top ...

  6. spark spark streaming + kafka receiver方式消费消息

    2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...

  7. kafka中生产者是如何把消息投递到哪个分区的?消费者又是怎么选择分区的?...

    作者 | 废物大师兄 来源 | https://www.cnblogs.com/cjsblog/p/9664536.html 1. 前言 我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名 ...

  8. kafka多个消费者消费一个topic_kafka:一文读懂消费者背后的那点quot;猫腻quot;

    来自:z小赵 前言 经过前几篇文章的介绍,大致了解了生产者背后的运行原理.消息有生产就得有人去消费,今天我们就来介绍下消费端消费消息背后发生的那点事儿. 文章概览 消费者与消费组的"父子关系 ...

  9. 【kafka】一次磁盘故障后消费者无法消费

    1.概述 转载:Kafka 运维纪实 – 一次磁盘故障后消费者无法消费 Kafka 自从某个版本加入 isr 等机制后真是越来越复杂了,再也不是原来那个单纯的 The log 了,碰上网络有个什么风吹 ...

最新文章

  1. 《数据库系统实训》实验报告——事务的应用
  2. RedHat7.0 设置weblogic开机自启动
  3. [Python图像处理] 九.形态学之图像开运算、闭运算、梯度运算
  4. 数据结构——最小生成树之prime算法(与最短路径之迪杰斯特拉算法很像)
  5. 根据中文修改英文翻译,重新生成英文翻译文件(保证原有文件的顺序不变)
  6. 30岁+程序员职场攻略:找到自己的“职业锚”乘风破浪
  7. git撤销commit到未提交状态_Git在4个阶段5种状态下的撤销操作
  8. 男女择偶基本心理类型
  9. ESXi vSphere Client中copy paste如何启用
  10. CentOS7安装mysql数据库完整过程以及安装中遇到的各种问题的解决方案
  11. 树(2)-----leetcode(层、深度、节点)
  12. Intent的一些简单用法
  13. 基于ASP.NETAJAX的WebPart开发与部署-转
  14. MySQL: 为什么使用 innobackupex 备份恢复搭建主从时,必须人为设置 gtid_purged 变量
  15. chcp 437>nul graftabl 936>nul
  16. FFMPEG框架学习——(2)视频的提取和解码
  17. teamviewer检测到商业用途的解决办法(win7 win8 win10亲测) 修改teamviewerID
  18. Web(万维网)发展简史
  19. PROFINET的GSD文件描述
  20. uni-app打开外链

热门文章

  1. 计算机图形学基础知识-三维变换
  2. 软件测试怎么有效的降低测试轮次
  3. 计算机管理说文件缺失lnk,Win10打开计算管理找不到文件Server manager.lnk
  4. Spring支持的几种bean的作用域
  5. Python 学习笔记 类的封装 类的继承 多态继承 类方法和静态方法 单例设计模式
  6. WIFI学习六(SNTP)
  7. i5 1135g7和i7 1165g7核显一样吗 i51135g7和i71165g7的相差大吗
  8. 字符串的模式匹配(KMP)算法
  9. Kali2.0关于flash的简单安装
  10. Jieba分词Python简单实现