import org.apache.kafka.clients.consumer.KafkaConsumer; //导入方法依赖的package包/类

/**

* Find the start offsets for the processing windows. We uses kafka 0.10.1.1 that does not support

* KafkaConsumer.

*/

public static Map getProcessingStartOffsets(KafkaConsumer kafkaConsumer,

String brokerStatsTopic,

long startTimestampInMillis) {

List partitionInfos = kafkaConsumer.partitionsFor(brokerStatsTopic);

LOG.info("Get partition info for {} : {} partitions", brokerStatsTopic, partitionInfos.size());

List topicPartitions = partitionInfos.stream()

.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))

.collect(Collectors.toList());

Map endOffsets = kafkaConsumer.endOffsets(topicPartitions);

Map beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);

Map offsets = new HashMap<>();

for (TopicPartition tp : topicPartitions) {

kafkaConsumer.unsubscribe();

LOG.info("assigning {} to kafkaconsumer", tp);

List tps = new ArrayList<>();

tps.add(tp);

kafkaConsumer.assign(tps);

long endOffset = endOffsets.get(tp);

long beginningOffset = beginningOffsets.get(tp);

long offset = Math.max(endOffsets.get(tp) - 10, beginningOffset);

ConsumerRecord record = retrieveOneMessage(kafkaConsumer, tp, offset);

BrokerStats brokerStats = OperatorUtil.deserializeBrokerStats(record);

if (brokerStats != null) {

long timestamp = brokerStats.getTimestamp();

while (timestamp > startTimestampInMillis) {

offset = Math.max(beginningOffset, offset - 5000);

record = retrieveOneMessage(kafkaConsumer, tp, offset);

brokerStats = OperatorUtil.deserializeBrokerStats(record);

if (brokerStats == null) {

break;

}

timestamp = brokerStats.getTimestamp();

}

}

offsets.put(tp, offset);

LOG.info("{}: offset = {}, endOffset = {}, # of to-be-processed messages = {}",

tp, offset, endOffset, endOffset - offset);

}

return offsets;

}

java kafka consumer_Java KafkaConsumer.endOffsets方法代码示例相关推荐

  1. cdate在java中_Java Calendar.add方法代码示例

    本文整理汇总了Java中java.util.Calendar.add方法的典型用法代码示例.如果您正苦于以下问题:Java Calendar.add方法的具体用法?Java Calendar.add怎 ...

  2. java hashmap putall_Java ConcurrentHashMap.putAll方法代码示例

    import java.util.concurrent.ConcurrentHashMap; //导入方法依赖的package包/类 /** * 生成更新Sql * @param entity * @ ...

  3. java中setattribute_Java Files.setAttribute方法代码示例

    import java.nio.file.Files; //导入方法依赖的package包/类 @Override public void setFileHiddenAttribute( String ...

  4. java default locale_Java JSON.defaultLocale方法代码示例

    import com.alibaba.fastjson.JSON; //导入方法依赖的package包/类 public void test_time() throws Exception { lon ...

  5. java sql xml_Java ResultSet.getSQLXML方法代码示例

    import java.sql.ResultSet; //导入方法依赖的package包/类 protected Object processColumn(ResultSet rs, int inde ...

  6. java get timestamp_Java Timestamp.getTime方法代码示例

    import java.sql.Timestamp; //导入方法依赖的package包/类 /** * Tests fix for BUG#5874, timezone correction goe ...

  7. java的setvisible_Java Container.setVisible方法代码示例

    import java.awt.Container; //导入方法依赖的package包/类 public static void loadOptions(final String name, fin ...

  8. java byte xml_Java XMLInputSource.setByteStream方法代码示例

    import org.apache.xerces.xni.parser.XMLInputSource; //导入方法依赖的package包/类 /** * Resolves an external p ...

  9. JAVA中rootpanel_Java JPanel.setFocusCycleRoot方法代码示例

    import javax.swing.JPanel; //导入方法依赖的package包/类 @Override public void actionPerformed(ActionEvent e) ...

最新文章

  1. 黄聪:Python+NLTK自然语言处理学习(三):计算机自动学习机制
  2. NSAutoReleasePool使用中drain和release的区别
  3. 优化SQL步骤—— explain分析执行计划 (explain 之 id)
  4. Java字符串String比较不要用==原因
  5. 2018 焦作站亚洲区域赛校内选拔赛题解
  6. 离散实验偏序关系满足实验报告C语言,离散数学实验三:偏序关系中盖住关系的求取及格论中有补格的判定...
  7. keil生成hex文件找不到_骚操作!用Python把公众号文章打包成pdf文件,再也不怕找不到了...
  8. Spring Boot Actuator:在其顶部具有MVC层的自定义端点
  9. android 勿扰模式代码,Android N Zen Mode (勿扰模式)设置流程
  10. redis的zset使用(java)——存取List< Object>
  11. iOS - Animation 八种方法
  12. 虚拟服务器配置推荐,如何选择虚拟主机配置
  13. 【图像融合】基于matlab GUI小波变换彩色图像融合(带面板)【含Matlab源码 782期】
  14. 肌电信号分析相关链接分享
  15. Fedora:使用Fedora 28在Firefox上启用h264视频(搜狐/优酷视频)
  16. Metasploit扫描漏洞模块
  17. 木兰词·拟古决绝词柬友
  18. CUDA C编程(三十)OpenACC的使用
  19. 电脑卡,电脑卡到不行?原因和解决方法都在这里了!
  20. Datewhale组队学习——深度学习推荐系统(1)

热门文章

  1. SqlcommandBuilder Sqlcommand sqldataAdapter DataSet DataTable
  2. Linux 拷贝文件:cp 命令
  3. css 背景图片和渐变色并存
  4. 《C++开发工程师岗位必备知识点一》
  5. 硬核“毕业证”:5 位本科生带自研处理器芯片毕业,包云岗解读“一生一芯”计划...
  6. 【深度学习】——过拟合的处理方法
  7. easyui输入框样式_EasyUI动态改变输入框width
  8. Java源码转C 源码的五款最佳工具
  9. 海思低功耗IP门铃方案(Hi3518EV200+Hi1131s+MCU+LiteOS)
  10. ubuntu下安装android sdk,ubuntu下配置Android sdk