前言

kafka提供了消费组命令工具管理消费组:kafka-consumer-groups.sh,在0.11版本之后引入位移重置功能,重置策略如下(引用自官方文档):

--reset-offsets also has following scenarios to choose from (at least one scenario must be selected):

  • --to-datetime <String: datetime> : Reset offsets to offsets from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'
  • --to-earliest : Reset offsets to earliest offset.
  • --to-latest : Reset offsets to latest offset.
  • --shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.
  • --from-file : Reset offsets to values defined in CSV file.
  • --to-current : Resets offsets to current offset.
  • --by-duration <String: duration> : Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'
  • --to-offset : Reset offsets to a specific offset.

就是可以根据时间重置、重置到最小位移、最大位移...等场景。

本文主要聊一下根据时间重置消费位点时候,这个时间格式的问题。

根据时间重置消费位移

示例命令:

sh bin/kafka-consumer-groups.sh \--bootstrap-server 'localhost:9092' --group test_topic_consumer  \--topic test_topic \--reset-offsets --to-datetime 2021-11-29T12:00:00.000 \--execute  

为了看起来直观点,我加了"\"换行展示。

示例命令重置消费组为test_topic_consumer订阅test_topic的消费偏移为2021年11月29号中午12点整的时候消息位点。

实际上,结果不一定是这样,和时区有关。

协调世界时(UTC)

关于utc,查看维基百科

kafka脚本采用的是utc时间标准,与北京时间换算如下:

点击查看:图片来源

北京为东8时区,采用的是utc+08:00,这一段可以看维基百科,下面我复制出一部分,可以了解一下:

UTC+08:00是比世界协调时间快8小时的时区,理论上的位置在东经112度30分127度30分之间,是东盟标准时间的候选时区之一,居住在本时区的人数约有17亿人,占全世界人口的24%,是全世界人口最多的时区。

该时区亦为包括台湾、新加坡、马来西亚、中国、文莱、印尼中部及澳大利亚西部在内的绝大多数汉语使用者所居住的时区。所以互联网上的不少中文网站会使用该时区标记时间,而不论该网站所在地的官方时区为何。

所以,如果你现在使用的是北京时间,如果按示例重置位点,则实际上不是重置到12:00,而且是重置到了20:00的消息位点。

北京时间重置写法

如果是北京时间,则命令应该如下:

sh bin/kafka-consumer-groups.sh \--bootstrap-server 'localhost:9092' --group test_topic_consumer  \--topic test_topic \--reset-offsets --to-datetime 2021-11-29T12:00:00.000+08:00 \--execute  

即时间格式为:2021-11-29T12:00:00.000+08:00 ,表示重置到11月29号的12点。当然这里还有其它写法,下面是源码注释中支持的写法:

(1) yyyy-MM-dd'T'HH:mm:ss.SSS, ex: 2020-11-10T16:51:38.198
(2) yyyy-MM-dd'T'HH:mm:ss.SSSZ, ex: 2020-11-10T16:51:38.198+0800
(3) yyyy-MM-dd'T'HH:mm:ss.SSSX, ex: 2020-11-10T16:51:38.198+08
(4) yyyy-MM-dd'T'HH:mm:ss.SSSXX, ex: 2020-11-10T16:51:38.198+0800
(5) yyyy-MM-dd'T'HH:mm:ss.SSSXXX, ex: 2020-11-10T16:51:38.198+08:00

重置流程

kafka根据时间重置消费位点这一块逻辑也是相当简单:

  1. 获取指定topic的分区(也可以是所有topic)
  2. 将时间转换为对应的时间戳,此时转换的时候就是上面提到的时区问题
  3. 根据时间戳获取对应的消息位点
  4. 修改消费位点为对应的消息位点

下面这段代码是根据时间戳查询位点的逻辑:

    private def getLogTimestampOffsets(groupId: String, topicPartitions: Seq[TopicPartition], timestamp: java.lang.Long): Map[TopicPartition, LogOffsetResult] = {val timestampOffsets = topicPartitions.map { topicPartition =>// 指定根据时间戳类型查询位点,除此之外还有最小和最大日志位点等topicPartition -> OffsetSpec.forTimestamp(timestamp)}.toMap// 查询消息位点val offsets = adminClient.listOffsets(timestampOffsets.asJava,withTimeoutMs(new ListOffsetsOptions)).all.get//如果时间戳超过当前最新的消息时间了,就是查不到了,就是未知,下面会把未知这种转换为最新的消费位点val (successfulOffsetsForTimes, unsuccessfulOffsetsForTimes) =offsets.asScala.partition(_._2.offset != ListOffsetsResponse.UNKNOWN_OFFSET)val successfulLogTimestampOffsets = successfulOffsetsForTimes.map {case (topicPartition, listOffsetsResultInfo) => topicPartition -> LogOffsetResult.LogOffset(listOffsetsResultInfo.offset)}.toMapunsuccessfulOffsetsForTimes.foreach { entry =>println(s"\nWarn: Partition " + entry._1.partition() + " from topic " + entry._1.topic() +" is empty. Falling back to latest known offset.")}// 将查询到的和未知这种转换为最新的日志位点一起返回,准备重置successfulLogTimestampOffsets ++ getLogEndOffsets(groupId, unsuccessfulOffsetsForTimes.keySet.toSeq)}

可视化重置

如果觉得重置命令太麻烦,推荐一款可视化控制台:kafka-console-ui,github地址:https://github.com/xxd763795151/kafka-console-ui

新手用这个学习还是比较友好的:

Kafka使用脚本根据时间重置消费位移,格式你写对了么?相关推荐

  1. kafka linux 脚本测试,kafka shell命令操作

    1. 查看topic 选项说明: - --list :查看kafka所有的topic - --bootstrap-server : 连接kafka集群 - --hadoop102:9092:hadoo ...

  2. python向kafka发送json数据_python3实现从kafka获取数据,并解析为json格式,写入到mysql中...

    项目需求:将kafka解析来的日志获取到数据库的变更记录,按照订单的级别和订单明细级别写入数据库,一条订单的所有信息包括各种维度信息均保存在一条json中,写入mysql5.7中. 配置信息: [Gl ...

  3. 【Kafka】(四)Kafka使用 Consumer 接收消息消费

    Consumer概要 consumer中的关键术语: 消费者(consumer):从kafka中拉取数据并进行处理 消费者组(consumer group):一个消费者组由一个或者多个consumer ...

  4. kafka消息反复从头开始消费问题排查

    问题描述   最近线上的一个数据服务(服务B)出现了一个比较诡异的问题 ,该服务消费上游服务(服务A)产生的kafka消息数据,上线后一直运行平稳,最近一周在两次上线的时候出现了大量数据更新的情况,查 ...

  5. kafka通过脚本一次启动集群

    kafka 群起脚本kafka.sh #!/bin/bashcase $1 in "start"){for i in backup01 backup02 backup03do ec ...

  6. Kafka会不会重复消费

    本文来说下kafka会不会重复消费的问题.在单体架构时代,就存在着接口幂等性的问题,只不过到了分布式.高并发的场景之后,接口幂等性的问题会更加明显. 文章目录 概述 消息重复消费问题 解决方案 方案一 ...

  7. graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四)

    graylog+kafka+zookeeper(单机测试及源码),graylog收集kafka(脚本创建发布订阅方式)存储的消息(四) 问题背景 graylog+kafka+zookeeper(单机测 ...

  8. Kafka实现消息生产和消费

    文章目录 一.Kafka测试消息生产与消费 二.Java程序进行Kafka收发消息 1.消息生产者 2.消息消费者 一.Kafka测试消息生产与消费 # 首先创建一个主题 [root@192 kafk ...

  9. 【画框脚本】YOLO和COCO格式画框

    [画框脚本]YOLO和COCO格式画框 1. yolo格式画框 2. COCO格式画框 1. yolo格式画框 import cv2 import os import colorsys import ...

最新文章

  1. 用 C 语言开发一门编程语言 — 跨平台的可移植性
  2. Object.defineProperty与双向绑定、数据监听
  3. 环信集成 2---基于环信Demo3.0,实现单聊功能
  4. 详解Class类文件的结构(上)
  5. AX2012导Demo数据
  6. 测试网站的url脚本测试网站是否正常
  7. PAT (Basic Level) Practice1020 月饼
  8. 逻辑程序设计语言Prolog
  9. journalctl如何使用?
  10. nodeJS笔记参考菜鸟教程
  11. 阿里架构师墙裂推荐Java岗实战文档:Spring全家桶+Docker+Redis
  12. 调频连续波雷达(FMCW)测距/测速原理
  13. 关于CSS小三角的实现,小三角边框的实现,IE6下CSS小三角非透明的情况
  14. C++中的delete——读书笔记
  15. Coprime Triples——CodeChef - COPRIME3
  16. 移动设备IP地址的获取
  17. mysql查询昨天的日期_MySQL关于时间的一些查询,查询今天,昨天......
  18. Git忽略文件.gitignore详解
  19. Java小白修炼手册--锻体期--第二阶段:Java SE 核心API
  20. 北欧岗位制博士申请有多难?

热门文章

  1. Win8 消费者预览版(中文版)硬盘安装
  2. 服务器管理和维护建议
  3. 旧的机器装了XP以后频繁死机
  4. mongodb内嵌数组倒序和分页实现
  5. spring相关面试题合集
  6. 职场新人出奇制胜的9大职场观
  7. 软考 | 2019年下半年 软件设计师 下午试卷
  8. MySQL排序规则批量修改
  9. 威漫哨兵机器人_同为漫威机器人,哨兵机器人和奥创之间谁更厉害?答案显然是他!...
  10. 商业合伙人和技术合伙人之间的矛盾与解决-评估商业合伙人和技术合伙人的能力和价值