kafka消费者api分为high api和low api,目前上述demo是都是使用kafka high api,高级api不用关心维护消费状态信息和负载均衡,不用关心offset。
高级api的一些注意事项:
1. 如果consumer group中的consumer线程数量比partition多,那么有的线程将永远不会收到消息。
因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数

2,如果consumer group中的consumer线程数量比partition少,那么有的线程将会收到多个消息。并且不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,

3,增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

4,High-level接口中获取不到数据的时候是会block的

关于consumer group(high api)的几点总结:
1,以consumer group为单位订阅 topic,每个consumer一起去消费一个topic;
2,consumer group 通过zookeeper来消费kafka集群中的消息(这个过程由zookeeper进行管理);
相对于low api自己管理offset,high api把offset的管理交给了zookeeper,但是high api并不是消费一次就在zookeeper中更新一次,而是每间隔一个(默认1000ms)时间更新一次offset,可能在重启消费者时拿到重复的消息。此外,当分区leader发生变更时也可能拿到重复的消息。因此在关闭消费者时最好等待一定时间(10s)然后再shutdown。
3,consumer group 设计的目的之一也是为了应用多线程同时去消费一个topic中的数据。

例子:

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {private KafkaStream m_stream;private int m_threadNumber;public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {m_threadNumber = a_threadNumber;m_stream = a_stream;}public void run() {ConsumerIterator<byte[], byte[]> it = m_stream.iterator();while (it.hasNext())System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));System.out.println("Shutting down Thread: " + m_threadNumber);}
}//配置连接zookeeper的信息
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);       //zookeeper连接地址props.put("group.id", a_groupId);          //consumer group的idprops.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}//建立一个消费者线程池
public void run(int a_numThreads) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);// now launch all the threads//executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}
}//经过一段时间后关闭
try {Thread.sleep(10000);} catch (InterruptedException ie) {}example.shutdown();
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {private KafkaStream m_stream;private int m_threadNumber;public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {m_threadNumber = a_threadNumber;m_stream = a_stream;}public void run() {ConsumerIterator<byte[], byte[]> it = m_stream.iterator();while (it.hasNext())System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));System.out.println("Shutting down Thread: " + m_threadNumber);}
}//配置连接zookeeper的信息
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {Properties props = new Properties();props.put("zookeeper.connect", a_zookeeper);       //zookeeper连接地址props.put("group.id", a_groupId);          //consumer group的idprops.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");return new ConsumerConfig(props);}//建立一个消费者线程池
public void run(int a_numThreads) {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(a_numThreads));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);// now launch all the threads//executor = Executors.newFixedThreadPool(a_numThreads);// now create an object to consume the messages//int threadNumber = 0;for (final KafkaStream stream : streams) {executor.submit(new ConsumerTest(stream, threadNumber));threadNumber++;}
}//经过一段时间后关闭
try {Thread.sleep(10000);} catch (InterruptedException ie) {}example.shutdown();

kafka consumer group总结相关推荐

  1. Kafka consumer group位移0ffset重设

    本文阐述如何使用Kafka自带的kafka-consumer-groups.sh脚本随意设置消费者组(consumer group)的位移.需要特别强调的是, 这是0.11.0.0版本提供的新功能且只 ...

  2. Kafka Consumer Group和Consumer Rebalance机制

    参考文章: Kafka Consumer Group和Consumer Rebalance机制 Kafka Consumer Group和Consumer Rebalance机制 - 简书 在新建一个 ...

  3. Kafka设计解析(四):Kafka Consumer解析--转

    原文地址:http://www.infoq.com/cn/articles/kafka-analysis-part-4?utm_source=infoq&utm_campaign=user_p ...

  4. Kafka设计解析(五): Kafka Consumer设计解析

    Kafka设计解析(五)- Kafka Consumer设计解析 大数据架构(郭俊_Jason) · 2015-09-18 08:24 点击上方 大数据架构   快速关注 Kafka Consumer ...

  5. Kafka设计解析(四):Kafka Consumer解析

    High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被 ...

  6. [Big Data - Kafka] Kafka设计解析(四):Kafka Consumer解析

    High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被 ...

  7. Kafka Consumer的两种接口(高阶低阶)

    Kafka Consumer接口 http://www.cnblogs.com/fxjwind/p/3794255.html 对于kafka的consumer接口,提供两种版本, high-level ...

  8. Apache Kafka Consumer 消费者集

    1.目标 在我们的上一篇文章中,我们讨论了Kafka Producer.今天,我们将讨论Kafka Consumer.首先,我们将看到什么是Kafka Consumer和Kafka Consumer的 ...

  9. Kafka consumer

    Kafka consumer consumer概览 消费者组 消费者组定义:消费者使用一个消费者组名(即group.id)来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者 ...

最新文章

  1. 统计学习导论 Chapter4--Classification
  2. 浅析Python3中的bytes和str类型
  3. SQLite的基本使用一
  4. 实现抢红包算法?如此简单
  5. Ubuntu 17 安装Chrome浏览器
  6. 大数据面试-02-大数据工程师面试题
  7. jQuery中eq与get的区别(整理)
  8. 代码生成器AutoGenerator
  9. 记录第一次用阿里云(Windows主机)部署SSM项目(Spring+SpringMVC+Mybatis)
  10. 如何在 Mac 上使用屏幕保护程序?
  11. SPP、RFB和ASPP
  12. Linux虚拟机下FTP服务器的搭建(详细)
  13. 4G工业路由器大气环境监测方案
  14. vue使用Export2Excel.js导出表格自定义样式(表头加分割斜线)(笔记)
  15. Vue项目中750设计稿px自动转化成rem方法(小白一个,记录自己遇到的小白问题,大家勿怪)
  16. MySQL-查看数据库
  17. GUNS框架图片上传详解
  18. python+gurobi
  19. Android基础知识(十)之多媒体
  20. 骚操作,VSCode上发布知乎

热门文章

  1. echarts-liquidfill 水球显示小数点
  2. Redis配置文件参数解释
  3. Python爬取HTTPS网站的图片
  4. transform三大属性之rotate(旋转)
  5. 【算法】算法分析与设计的基本方法
  6. Android之——获取进程总数、内存、任务列表
  7. 再获殊荣,鸿翼入选2021年新一代信息技术创新案例名单
  8. 有关Oracle数据库的几个神话
  9. AdaBoost从原理到实现
  10. 问题解决:Scrapy Unknown command: crawl