java kafka consumer_Java KafkaConsumer.endOffsets方法代码示例
import org.apache.kafka.clients.consumer.KafkaConsumer; //导入方法依赖的package包/类
/**
* Find the start offsets for the processing windows. We uses kafka 0.10.1.1 that does not support
* KafkaConsumer.
*/
public static Map getProcessingStartOffsets(KafkaConsumer kafkaConsumer,
String brokerStatsTopic,
long startTimestampInMillis) {
List partitionInfos = kafkaConsumer.partitionsFor(brokerStatsTopic);
LOG.info("Get partition info for {} : {} partitions", brokerStatsTopic, partitionInfos.size());
List topicPartitions = partitionInfos.stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
Map endOffsets = kafkaConsumer.endOffsets(topicPartitions);
Map beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
Map offsets = new HashMap<>();
for (TopicPartition tp : topicPartitions) {
kafkaConsumer.unsubscribe();
LOG.info("assigning {} to kafkaconsumer", tp);
List tps = new ArrayList<>();
tps.add(tp);
kafkaConsumer.assign(tps);
long endOffset = endOffsets.get(tp);
long beginningOffset = beginningOffsets.get(tp);
long offset = Math.max(endOffsets.get(tp) - 10, beginningOffset);
ConsumerRecord record = retrieveOneMessage(kafkaConsumer, tp, offset);
BrokerStats brokerStats = OperatorUtil.deserializeBrokerStats(record);
if (brokerStats != null) {
long timestamp = brokerStats.getTimestamp();
while (timestamp > startTimestampInMillis) {
offset = Math.max(beginningOffset, offset - 5000);
record = retrieveOneMessage(kafkaConsumer, tp, offset);
brokerStats = OperatorUtil.deserializeBrokerStats(record);
if (brokerStats == null) {
break;
}
timestamp = brokerStats.getTimestamp();
}
}
offsets.put(tp, offset);
LOG.info("{}: offset = {}, endOffset = {}, # of to-be-processed messages = {}",
tp, offset, endOffset, endOffset - offset);
}
return offsets;
}
java kafka consumer_Java KafkaConsumer.endOffsets方法代码示例相关推荐
- cdate在java中_Java Calendar.add方法代码示例
本文整理汇总了Java中java.util.Calendar.add方法的典型用法代码示例.如果您正苦于以下问题:Java Calendar.add方法的具体用法?Java Calendar.add怎 ...
- java hashmap putall_Java ConcurrentHashMap.putAll方法代码示例
import java.util.concurrent.ConcurrentHashMap; //导入方法依赖的package包/类 /** * 生成更新Sql * @param entity * @ ...
- java中setattribute_Java Files.setAttribute方法代码示例
import java.nio.file.Files; //导入方法依赖的package包/类 @Override public void setFileHiddenAttribute( String ...
- java default locale_Java JSON.defaultLocale方法代码示例
import com.alibaba.fastjson.JSON; //导入方法依赖的package包/类 public void test_time() throws Exception { lon ...
- java sql xml_Java ResultSet.getSQLXML方法代码示例
import java.sql.ResultSet; //导入方法依赖的package包/类 protected Object processColumn(ResultSet rs, int inde ...
- java get timestamp_Java Timestamp.getTime方法代码示例
import java.sql.Timestamp; //导入方法依赖的package包/类 /** * Tests fix for BUG#5874, timezone correction goe ...
- java的setvisible_Java Container.setVisible方法代码示例
import java.awt.Container; //导入方法依赖的package包/类 public static void loadOptions(final String name, fin ...
- java byte xml_Java XMLInputSource.setByteStream方法代码示例
import org.apache.xerces.xni.parser.XMLInputSource; //导入方法依赖的package包/类 /** * Resolves an external p ...
- JAVA中rootpanel_Java JPanel.setFocusCycleRoot方法代码示例
import javax.swing.JPanel; //导入方法依赖的package包/类 @Override public void actionPerformed(ActionEvent e) ...
最新文章
- 黄聪:Python+NLTK自然语言处理学习(三):计算机自动学习机制
- NSAutoReleasePool使用中drain和release的区别
- 优化SQL步骤—— explain分析执行计划 (explain 之 id)
- Java字符串String比较不要用==原因
- 2018 焦作站亚洲区域赛校内选拔赛题解
- 离散实验偏序关系满足实验报告C语言,离散数学实验三:偏序关系中盖住关系的求取及格论中有补格的判定...
- keil生成hex文件找不到_骚操作!用Python把公众号文章打包成pdf文件,再也不怕找不到了...
- Spring Boot Actuator:在其顶部具有MVC层的自定义端点
- android 勿扰模式代码,Android N Zen Mode (勿扰模式)设置流程
- redis的zset使用(java)——存取List< Object>
- iOS - Animation 八种方法
- 虚拟服务器配置推荐,如何选择虚拟主机配置
- 【图像融合】基于matlab GUI小波变换彩色图像融合(带面板)【含Matlab源码 782期】
- 肌电信号分析相关链接分享
- Fedora:使用Fedora 28在Firefox上启用h264视频(搜狐/优酷视频)
- Metasploit扫描漏洞模块
- 木兰词·拟古决绝词柬友
- CUDA C编程(三十)OpenACC的使用
- 电脑卡,电脑卡到不行?原因和解决方法都在这里了!
- Datewhale组队学习——深度学习推荐系统(1)
热门文章
- SqlcommandBuilder Sqlcommand sqldataAdapter DataSet DataTable
- Linux 拷贝文件:cp 命令
- css 背景图片和渐变色并存
- 《C++开发工程师岗位必备知识点一》
- 硬核“毕业证”:5 位本科生带自研处理器芯片毕业,包云岗解读“一生一芯”计划...
- 【深度学习】——过拟合的处理方法
- easyui输入框样式_EasyUI动态改变输入框width
- Java源码转C 源码的五款最佳工具
- 海思低功耗IP门铃方案(Hi3518EV200+Hi1131s+MCU+LiteOS)
- ubuntu下安装android sdk,ubuntu下配置Android sdk