Kafka JAVA客户端代码示例--高级应用
2019独角兽企业重金招聘Python工程师标准>>>
什么时间使用高级应用?
- 针对一个消息读取多次
- 在一个process中,仅仅处理一个topic中的一组partitions
- 使用事务,确保每个消息只被处理一次
使用高级应用(调用较底层函数)的缺点?
SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要)
- 在应用程序中跟踪上次消息处理的offset
- 确定一个topic partition的lead broker
- 手工处理broker leander的改变
使用底层函数(SimpleConsumer)开发的步骤
- 通过active broker,确定topic partition的lead broker
- 确定topic partition的replicat brokers
- 根据需要,创建数据请求
- 抓取数据
- 识别lead brokder改变并进行恢复
代码示例
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;/*** https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example* @author Fung**/
public class ConsumerSimpleExample {public static void main(String arg[]) {String[] args={"20","page_visits","2","172.168.63.233","9092"};ConsumerSimpleExample example = new ConsumerSimpleExample();long maxReads = Long.parseLong(args[0]);String topic = args[1];int partition = Integer.parseInt(args[2]);List<String> seeds = new ArrayList<String>();seeds.add(args[3]);int port = Integer.parseInt(args[4]);try {example.run(maxReads, topic, partition, seeds, port);} catch (Exception e) {System.out.println("Oops:" + e);e.printStackTrace();}}private List<String> m_replicaBrokers = new ArrayList<String>();public ConsumerSimpleExample() {m_replicaBrokers = new ArrayList<String>();}public void run(long a_maxReads, String a_topic, int a_partition,List<String> a_seedBrokers, int a_port) throws Exception {// find the meta data about the topic and partition we are interested in//PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,a_partition);if (metadata == null) {System.out.println("Can't find metadata for Topic and Partition. Exiting");return;}if (metadata.leader() == null) {System.out.println("Can't find Leader for Topic and Partition. Exiting");return;}String leadBroker = metadata.leader().host();String clientName = "Client_" + a_topic + "_" + a_partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,100000, 64 * 1024, clientName);long readOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);int numErrors = 0;while (a_maxReads > 0) {if (consumer == null) {consumer = new SimpleConsumer(leadBroker, a_port, 100000,64 * 1024, clientName);}// Note: this fetchSize of 100000 might need to be increased if// large batches are written to KafkaFetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();FetchResponse fetchResponse = consumer.fetch(req);if (fetchResponse.hasError()) {numErrors++;// Something went wrong!short code = fetchResponse.errorCode(a_topic, a_partition);System.out.println("Error fetching data from the Broker:"+ leadBroker + " Reason: " + code);if (numErrors > 5)break;if (code == ErrorMapping.OffsetOutOfRangeCode()) {// We asked for an invalid offset. For simple case ask for// the last element to resetreadOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);continue;}consumer.close();consumer = null;leadBroker = findNewLeader(leadBroker, a_topic, a_partition,a_port);continue;}numErrors = 0;long numRead = 0;for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {long currentOffset = messageAndOffset.offset();if (currentOffset < readOffset) {System.out.println("Found an old offset: " + currentOffset+ " Expecting: " + readOffset);continue;}readOffset = messageAndOffset.nextOffset();ByteBuffer payload = messageAndOffset.message().payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);System.out.println(String.valueOf(messageAndOffset.offset())+ ": " + new String(bytes, "UTF-8"));numRead++;a_maxReads--;}if (numRead == 0) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}if (consumer != null)consumer.close();}public static long getLastOffset(SimpleConsumer consumer, String topic,int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic,partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {System.out.println("Error fetching data Offset Data the Broker. Reason: "+ response.errorCode(topic, partition));return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}private String findNewLeader(String a_oldLeader, String a_topic,int a_partition, int a_port) throws Exception {for (int i = 0; i < 3; i++) {boolean goToSleep = false;PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,a_topic, a_partition);if (metadata == null) {goToSleep = true;} else if (metadata.leader() == null) {goToSleep = true;} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())&& i == 0) {// first time through if the leader hasn't changed give// ZooKeeper a second to recover// second time, assume the broker did recover before failover,// or it was a non-Broker issue//goToSleep = true;} else {return metadata.leader().host();}if (goToSleep) {try {Thread.sleep(1000);} catch (InterruptedException ie) {}}}System.out.println("Unable to find new leader after Broker failure. Exiting");throw new Exception("Unable to find new leader after Broker failure. Exiting");}private PartitionMetadata findLeader(List<String> a_seedBrokers,int a_port, String a_topic, int a_partition) {PartitionMetadata returnMetaData = null;loop: for (String seed : a_seedBrokers) {SimpleConsumer consumer = null;try {consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,"leaderLookup");List<String> topics = Collections.singletonList(a_topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {if (part.partitionId() == a_partition) {returnMetaData = part;break loop;}}}} catch (Exception e) {System.out.println("Error communicating with Broker [" + seed+ "] to find Leader for [" + a_topic + ", "+ a_partition + "] Reason: " + e);} finally {if (consumer != null)consumer.close();}}if (returnMetaData != null) {m_replicaBrokers.clear();for (Broker replica : returnMetaData.replicas()) {m_replicaBrokers.add(replica.host());}}return returnMetaData;}
}
参考
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
转载于:https://my.oschina.net/cloudcoder/blog/299222
Kafka JAVA客户端代码示例--高级应用相关推荐
- java 生成客户端代码_swagger-codegen生成java客户端代码
前后端分离的时候,需要建立契约,Swagger可达到该目的(略). 建立Rest接口后,通过swagger-codegen项目可以自动生成对应的客户端代码(c++.php.java.js.node等等 ...
- CA双向认证完整实现步骤(附java客户端代码)
一.基础概念 注:以下概念除专业名词外,均为个人理解,不具备权威性. 1.什么是系统安全管理 置于公网的系统,通常都需要一定的安全管理,据我个人理解,这里的安全管理主要分三个方面: 一是应用内的权限控 ...
- ssl证书CA双向认证完整实现步骤(附java客户端代码)(好文章!)
一.基础概念 注:以下概念除专业名词外,均为个人理解,不具备权威性. 1.什么是系统安全管理 置于公网的系统,通常都需要一定的安全管理,据我个人理解,这里的安全管理主要分三个方面: 一是应用内的权限控 ...
- 冒泡排序的代码java,Java冒泡排序代码示例
Java冒泡排序代码示例 代码如下:↓ package com.coding.learn; import java.util.Arrays; /** * @author 老菜鸟 * @version ...
- SFTP客户端代码示例
参考链接:SFTP客户端代码示例 操作系统:Windows7/8,VS2013 环境:libssh2 1.4.3.zlib-1.2.8.openssl-1.0.1g 原文: "从http:/ ...
- Kafka Java客户端Stream API
Kafka Stream概念及初识高层架构图 Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能.简 ...
- kafka Java客户端之 consumer API 消费消息
背景:我使用docker-compose 搭建的kafka服务 kafka的简单介绍以及docker-compose部署单主机Kafka集群 使用consumer API消费指定Topic里面的消息 ...
- es文本分析java代码_Elasticsearch系列---Java客户端代码Demo
前言 前面历经33篇内容的讲解,与ES的请求操作都是在Kibana平台上用Restful请求完成的,一直没发布Java或python的客户端代码,Restful才是运用.理解ES核心功能最直接的表达方 ...
- java爬虫代码示例_那些让你代码思维和能力有较大的提升Java源码
来源:www.cnblogs.com/jiagou/p/9270070.html 对于学习J2EE的框架有很大的帮助,代码里使用了各种设计模式.事件机制.Java8语法.代码量也很小,web服务使用N ...
最新文章
- CatBoost讲解
- 上证50ETF申赎清单
- JForum二次开发(一)
- 5G 落地进入爆发期,是时候让毫米波登场了
- Javascript版的Repeater控件实现
- git 查看某些文档的历史版本_10分钟了解git
- 1.概率论-组合分析
- [文摘20090622]HP大中华区总裁孙振耀退休十五天后九大感言
- 计算机开机桌面黑,电脑能正常启动但屏幕全黑原因和解决方法
- 图数据库初探——6. Nebula Graph安装和简单使用
- jquery 漂浮广告
- 阿里云 Aliplayer高级功能介绍(三):多字幕 1
- 学习管理系统五大好处
- excel中读取数据拟合幂律分布
- 排序算法为什么要求稳定性
- 专精特新中小企业认定标准
- 4-9-6 tf.keras入门(附带复现cvpr论文流程与代码)
- TypeScript故事—如何使用TypeScript在NPM上发布自定义钩子
- KDE声响效劳器──aRts
- 使用mui制作一个web2app类型的app
热门文章
- SpringBoot自动装载
- 一周一论文(翻译)——[VLDB 18] Chi:分布式流处理系统下可扩展的、可编程的控制计划模块
- Spark详解(五):Spark作业执行原理
- 定制linux版本,Instalinux:在线自由定制 Linux 发行版
- 手动制造报错_一个订单管理系统帮你轻松应对复杂的生产订单管理
- linux进程假死的原因_一次Spring Boot假死诊断
- Redis的数据类型详解
- apache服务出现Forbidden 403问题的解决方法总结
- 这是一个测试rss的内容哦
- 【图算法】Dijkstra算法及变形