2019独角兽企业重金招聘Python工程师标准>>>

什么时间使用高级应用?

  1. 针对一个消息读取多次
  2. 在一个process中,仅仅处理一个topic中的一组partitions
  3. 使用事务,确保每个消息只被处理一次

使用高级应用(调用较底层函数)的缺点?

SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要)

  1. 在应用程序中跟踪上次消息处理的offset
  2. 确定一个topic partition的lead broker
  3. 手工处理broker leander的改变

使用底层函数(SimpleConsumer)开发的步骤

  1. 通过active broker,确定topic partition的lead broker
  2. 确定topic partition的replicat brokers
  3. 根据需要,创建数据请求
  4. 抓取数据
  5. 识别lead brokder改变并进行恢复

代码示例

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;/*** https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example* @author Fung**/
public class ConsumerSimpleExample {public static void main(String arg[]) {String[] args={"20","page_visits","2","172.168.63.233","9092"};ConsumerSimpleExample example = new ConsumerSimpleExample();long maxReads = Long.parseLong(args[0]);String topic = args[1];int partition = Integer.parseInt(args[2]);List<String> seeds = new ArrayList<String>();seeds.add(args[3]);int port = Integer.parseInt(args[4]);try {example.run(maxReads, topic, partition, seeds, port);} catch (Exception e) {System.out.println("Oops:" + e);e.printStackTrace();}}private List<String> m_replicaBrokers = new ArrayList<String>();public ConsumerSimpleExample() {m_replicaBrokers = new ArrayList<String>();}public void run(long a_maxReads, String a_topic, int a_partition,List<String> a_seedBrokers, int a_port) throws Exception {// find the meta data about the topic and partition we are interested in//PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,100000, 64 * 1024, clientName);long readOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000,64 * 1024, clientName);}// Note: this fetchSize of 100000 might need to be increased if// large batches are written to KafkaFetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:"+ leadBroker + " Reason: " + code);if (numErrors > 5)break;if (code == ErrorMapping.OffsetOutOfRangeCode()) {// We asked for an invalid offset. For simple case ask for// the last element to resetreadOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition,a_port);continue;}numErrors = 0;long numRead = 0;for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset+ " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset())+ ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null)consumer.close();}public static long getLastOffset(SimpleConsumer consumer, String topic,int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: "+ response.errorCode(topic, partition));return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}private String findNewLeader(String a_oldLeader, String a_topic,int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())&& i == 0) {// first time through if the leader hasn't changed give// ZooKeeper a second to recover// second time, assume the broker did recover before failover,// or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}private PartitionMetadata findLeader(List<String> a_seedBrokers,int a_port, String a_topic, int a_partition) {PartitionMetadata returnMetaData = null;loop: for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,"leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed+ "] to find Leader for [" + a_topic + ", "+ a_partition + "] Reason: " + e);} finally {if (consumer != null)consumer.close();}}if (returnMetaData != null) {m_replicaBrokers.clear();for (Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}}return returnMetaData;}
}

参考

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

转载于:https://my.oschina.net/cloudcoder/blog/299222

Kafka JAVA客户端代码示例--高级应用相关推荐

  1. java 生成客户端代码_swagger-codegen生成java客户端代码

    前后端分离的时候,需要建立契约,Swagger可达到该目的(略). 建立Rest接口后,通过swagger-codegen项目可以自动生成对应的客户端代码(c++.php.java.js.node等等 ...

  2. CA双向认证完整实现步骤(附java客户端代码)

    一.基础概念 注:以下概念除专业名词外,均为个人理解,不具备权威性. 1.什么是系统安全管理 置于公网的系统,通常都需要一定的安全管理,据我个人理解,这里的安全管理主要分三个方面: 一是应用内的权限控 ...

  3. ssl证书CA双向认证完整实现步骤(附java客户端代码)(好文章!)

    一.基础概念 注:以下概念除专业名词外,均为个人理解,不具备权威性. 1.什么是系统安全管理 置于公网的系统,通常都需要一定的安全管理,据我个人理解,这里的安全管理主要分三个方面: 一是应用内的权限控 ...

  4. 冒泡排序的代码java,Java冒泡排序代码示例

    Java冒泡排序代码示例 代码如下:↓ package com.coding.learn; import java.util.Arrays; /** * @author 老菜鸟 * @version ...

  5. SFTP客户端代码示例

    参考链接:SFTP客户端代码示例 操作系统:Windows7/8,VS2013 环境:libssh2 1.4.3.zlib-1.2.8.openssl-1.0.1g 原文: "从http:/ ...

  6. Kafka Java客户端Stream API

    Kafka Stream概念及初识高层架构图 Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能.简 ...

  7. kafka Java客户端之 consumer API 消费消息

    背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...

  8. es文本分析java代码_Elasticsearch系列---Java客户端代码Demo

    前言 前面历经33篇内容的讲解,与ES的请求操作都是在Kibana平台上用Restful请求完成的,一直没发布Java或python的客户端代码,Restful才是运用.理解ES核心功能最直接的表达方 ...

  9. java爬虫代码示例_那些让你代码思维和能力有较大的提升Java源码

    来源:www.cnblogs.com/jiagou/p/9270070.html 对于学习J2EE的框架有很大的帮助,代码里使用了各种设计模式.事件机制.Java8语法.代码量也很小,web服务使用N ...

最新文章

  1. CatBoost讲解
  2. 上证50ETF申赎清单
  3. JForum二次开发(一)
  4. 5G 落地进入爆发期,是时候让毫米波登场了
  5. Javascript版的Repeater控件实现
  6. git 查看某些文档的历史版本_10分钟了解git
  7. 1.概率论-组合分析
  8. [文摘20090622]HP大中华区总裁孙振耀退休十五天后九大感言
  9. 计算机开机桌面黑,电脑能正常启动但屏幕全黑原因和解决方法
  10. 图数据库初探——6. Nebula Graph安装和简单使用
  11. jquery 漂浮广告
  12. 阿里云 Aliplayer高级功能介绍(三):多字幕 1
  13. 学习管理系统五大好处
  14. excel中读取数据拟合幂律分布
  15. 排序算法为什么要求稳定性
  16. 专精特新中小企业认定标准
  17. 4-9-6 tf.keras入门(附带复现cvpr论文流程与代码)
  18. TypeScript故事—如何使用TypeScript在NPM上发布自定义钩子
  19. KDE声响效劳器──aRts
  20. 使用mui制作一个web2app类型的app

热门文章

  1. SpringBoot自动装载
  2. 一周一论文(翻译)——[VLDB 18] Chi:分布式流处理系统下可扩展的、可编程的控制计划模块
  3. Spark详解(五):Spark作业执行原理
  4. 定制linux版本,Instalinux:在线自由定制 Linux 发行版
  5. 手动制造报错_一个订单管理系统帮你轻松应对复杂的生产订单管理
  6. linux进程假死的原因_一次Spring Boot假死诊断
  7. Redis的数据类型详解
  8. apache服务出现Forbidden 403问题的解决方法总结
  9. 这是一个测试rss的内容哦
  10. 【图算法】Dijkstra算法及变形