Kafka的坑: 消费者无法消费消息
问题:今天使用Kafka做一个小DEMO,但运行的时候,可以在IDEA控制台上看到生产者发送的消息,无法在IDEA看到消费者在消费消息,但通过连接Linux在命令行可以看到消费者消费的消息。
生产者应该是没有问题的,给出消费者的代码:
/*** Kafka消费者*/
public class KafkaConsumer extends Thread{private String topic;public KafkaConsumer(String topic) {this.topic = topic;}private ConsumerConnector createConnector(){Properties properties = new Properties();properties.put("zookeeper.connect", KafkaProperties.ZK);properties.put("group.id",KafkaProperties.GROUP_ID);properties.put("rebalance.max.retries", "10");properties.put("rebalance.backoff.ms", "2000");return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));}@Overridepublic void run() {ConsumerConnector consumer = createConnector();Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, 1);//一直就卡在这里,没有办法获取接受到的消息流Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator();while (iterator.hasNext()) {String message = new String(iterator.next().message());System.out.println("rec: " + message);}}
}
通过源码跟踪:
进入上面这个方法里面的最后一行
最后的错误定位在上面这个图里面,大致意思是:“不知道注册broker版本号,仅仅支持1和2”
解决办法:通过上面的定位已经指定是哪里错误了,是kafka注册进zookeeper中的这个路径 /broker/ids/0 里面的版本号错误,我登陆zk的客户端,将里面的version更换成1在重试,发现就成功了。
疑问:zk下面的这个目录是第一次启动kafka时候创建的,不知道为什么,我第一次启动时候的version竟然是4,kafka我用的版本是kafka_2.11_0.11.0.0,是我配置还是哪里的问题呀?
Kafka的坑: 消费者无法消费消息相关推荐
- Kafka学习(十)--Kafka消费者Consumer消费消息配置实战
一. Kafka消费者Consumer消费消息配置实战 配置: public static Properties getProperties() {Properties props = new Pro ...
- kafka设置起止时间消费消息
消费kafka消息时,有时可能需要消费某个时间段的消息,写个demo记录下: public class KafkaConsumerByTime {public static void main(Str ...
- 利用Kafka发送/消费消息-Java示例
利用Kafka发送/消费消息-Java示例 当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方. 依赖配置 & ...
- kafka_消费者组消费进度监控实现
对于 Kafka 消费者,最重要的就是监控它们的消费进度,或者说监控它们消费的滞后程度(消费者 Lag 或 Consumer Lag). 所谓滞后程度,就是指消费者当前落后于生产者的程度.Lag 的单 ...
- kafka 基础概念、命令行操作(查看所有topic、创建topic、删除topic、查看某个Topic的详情、修改分区数、发送消息、消费消息、 查看消费者组 、更新消费者的偏移位置)
文章目录 前言 1. 基础概念 Broker Producer Consumer Consumer Group Topic Partition Replica 2. 命令行操作 2.1 查看所有top ...
- spark spark streaming + kafka receiver方式消费消息
2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...
- kafka中生产者是如何把消息投递到哪个分区的?消费者又是怎么选择分区的?...
作者 | 废物大师兄 来源 | https://www.cnblogs.com/cjsblog/p/9664536.html 1. 前言 我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名 ...
- kafka多个消费者消费一个topic_kafka:一文读懂消费者背后的那点quot;猫腻quot;
来自:z小赵 前言 经过前几篇文章的介绍,大致了解了生产者背后的运行原理.消息有生产就得有人去消费,今天我们就来介绍下消费端消费消息背后发生的那点事儿. 文章概览 消费者与消费组的"父子关系 ...
- 【kafka】一次磁盘故障后消费者无法消费
1.概述 转载:Kafka 运维纪实 – 一次磁盘故障后消费者无法消费 Kafka 自从某个版本加入 isr 等机制后真是越来越复杂了,再也不是原来那个单纯的 The log 了,碰上网络有个什么风吹 ...
最新文章
- 《数据库系统实训》实验报告——事务的应用
- RedHat7.0 设置weblogic开机自启动
- [Python图像处理] 九.形态学之图像开运算、闭运算、梯度运算
- 数据结构——最小生成树之prime算法(与最短路径之迪杰斯特拉算法很像)
- 根据中文修改英文翻译,重新生成英文翻译文件(保证原有文件的顺序不变)
- 30岁+程序员职场攻略:找到自己的“职业锚”乘风破浪
- git撤销commit到未提交状态_Git在4个阶段5种状态下的撤销操作
- 男女择偶基本心理类型
- ESXi vSphere Client中copy paste如何启用
- CentOS7安装mysql数据库完整过程以及安装中遇到的各种问题的解决方案
- 树(2)-----leetcode(层、深度、节点)
- Intent的一些简单用法
- 基于ASP.NETAJAX的WebPart开发与部署-转
- MySQL: 为什么使用 innobackupex 备份恢复搭建主从时,必须人为设置 gtid_purged 变量
- chcp 437>nul graftabl 936>nul
- FFMPEG框架学习——(2)视频的提取和解码
- teamviewer检测到商业用途的解决办法(win7 win8 win10亲测) 修改teamviewerID
- Web(万维网)发展简史
- PROFINET的GSD文件描述
- uni-app打开外链