本文我们探讨几种关于如何删除kafka主题数据的策略。

场景分析

在进入主题之前,先讨论下需要删除kafka主题数据的应用场景。

场景介绍

kafka消息在过了保留周期之后会被自动清除。但总有一些情况,需要立刻删除消息。

假设这样场景:已经开始给kafka主题生产消息的应用发现了缺陷,接着bug修复程序需要更新,这是kafka主题中已经了一些错误的消息。这样场景通常在开发环境,我们需要的就是快速批量删除这些消息。

模拟环境

为了模拟环境,首先在kafka目录中创建 purge-scenario主题:

$ bin/kafka-topics.sh \--create --topic purge-scenario --if-not-exists \--partitions 2 --replication-factor 1 \--zookeeper localhost:2181

接着使用shuf命令生成随机数,然后通过kafka-console-producer.sh发送kafka主题:

$ /usr/bin/shuf -i 1-100000 -n 50000000 \| tee -a /tmp/kafka-random-data \| bin/kafka-console-producer.sh \--bootstrap-server=0.0.0.0:9092 \--topic purge-scenario

shuf -i 1-100000 -n 50000000 :表示生成n个1-100000范围内随机数。

tee -a 前面命令结果写入文件 -a 表示追加;这是使用tee保存模拟数据是为了以后使用;

最后验证消费主题消息:

$ bin/kafka-console-consumer.sh \--bootstrap-server=0.0.0.0:9092 \--from-beginning --topic purge-scenario \--max-messages 3
76696
49425
1744
Processed a total of 3 messages

消息过期

在purge-scenario主题中的消息有缺省7天保留时间。为了删除消息,我们可以临时设置主题的 retention.ms 属性为10秒,然后等待其自动过期:

$ bin/kafka-configs.sh --alter \--add-config retention.ms=10000 \--bootstrap-server=0.0.0.0:9092 \--topic purge-scenario \&& sleep 10

现在验证消费是否过期:

$ bin/kafka-console-consumer.sh  \--bootstrap-server=0.0.0.0:9092 \--from-beginning --topic purge-scenario \--max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

最后,我们要恢复主题的保留周期:

$ bin/kafka-configs.sh --alter \--add-config retention.ms=604800000 \--bootstrap-server=0.0.0.0:9092 \--topic purge-scenario

通过这个方法,kafka会删除主题所有分区的数据。

选择性删除消息

有时可能需要有选择性删除一个或多个主题的分区数据,可以使用kafka-delete-records.sh脚本实现。首先需要在delete-config.json 配置文件中指定分区级偏移量,我们打算分区的删除所有数据,partition指定分区,offset=-1:

{"partitions": [{"topic": "purge-scenario","partition": 1,"offset": -1}],"version": 1
}

接着处理删除记录:

$ bin/kafka-delete-records.sh \--bootstrap-server localhost:9092 \--offset-json-file delete-config.json

现在验证从分区0获取数据:

$ bin/kafka-console-consumer.sh \--bootstrap-server=0.0.0.0:9092 \--from-beginning --topic purge-scenario --partition=0 \--max-messages 1 --timeout-ms 100044017Processed a total of 1 messages

接着从分区1获取数据:

$ bin/kafka-console-consumer.sh \--bootstrap-server=0.0.0.0:9092 \--from-beginning --topic purge-scenario \--partition=1 \--max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

删除重新创建主题

另外方法是通过删除主题删除其所有数据,然后重新创建主题。当然只有服务端设置delete.topic.enable属性为true才可能删除主题:

$ bin/kafka-server-start.sh config/server.properties \--override delete.topic.enable=true

可以通过kafka-topics.sh命令删除主题:

$ bin/kafka-topics.sh \--delete --topic purge-scenario \--zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

现在验证主题:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

如果确认主题已不存在,然后再重新创建主题。

总结

本文介绍了几种方式删除kafka主题数据。包括设置主题过期时间、删除主题所有数据及部分分区数据,到通过删除主题变相删除数据。

如何删除kafka主题数据相关推荐

  1. kafka删除主题数据和删除主题

    版权声明:本文为博主原创文章,转载注明出处. https://blog.csdn.net/forrest_ou/article/details/78999983 kafka使用confluent-3. ...

  2. 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)--转载...

    原文:http://blog.csdn.net/changong28/article/details/39325079 使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指 ...

  3. AKHQ:用于Apache Kafka管理主题、主题数据、消费者组、模式注册表、连接等的Kafka GUI。。。

    参考文章:https://www.5axxw.com/wiki/content/q7nyiu AKHQ(以前称为KafkaHQ) 用于Apache Kafka管理主题.主题数据.消费者组.模式注册表. ...

  4. kafka如何彻底删除topic及数据

    前言: 删除kafka topic及其数据,严格来说并不是很难的操作.但是,往往给kafka 使用者带来诸多问题.项目组之前接触过多个开发者,发现都会偶然出现无法彻底删除kafka的情况.本文总结多个 ...

  5. kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...

    使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...

  6. java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?...

    我正在使用Kafka Consumer API将所有数据从Kafka主题复制到Hive表 . 为此,我使用HDFS作为中间步骤 . 我使用唯一的组ID并将偏移重置为"最早",以便从 ...

  7. CDH6 kafka如何彻底删除topic及数据

    标题CDH6 kafka如何彻底删除topic及数据 删除kafka topic及其数据,发现都会偶然出现无法彻底删除kafka的情况.本人亲测并总结了以下流程. 第一步: 设置 auto.creat ...

  8. 多个客户端抢夺命名管道_使用Kafka构建数据管道

    目标:使用Kafka和使用Redis的服务层编写数据管道. 先决条件 请根据您的操作系统安装以下组件: · Kafka · Zookeeper · Redis · Java 目标观众 本文针对的是正在 ...

  9. kafka监听topic消费_Kafka消费者-从Kafka读取数据

    (1)Customer和Customer Group (1)两种常用的消息模型 队列模型(queuing)和发布-订阅模型(publish-subscribe). 队列的处理方式是一组消费者从服务器读 ...

最新文章

  1. 使用模块化编译缩小 apk 体积
  2. Linux的find grep
  3. 看一下即将发布的JSF 2.3 Push支持
  4. easyui 插入中间行
  5. vue——懒加载(异步延迟和彻底懒加载)
  6. map的key可以试一个数组吗?_二维数组的 DP
  7. gridreport如何设置打印3次_pdfFactory如何设置限制打印和浏览文档权限
  8. linux nfs限制连接数,linux – 对NFS有一个有效的稳定性参数吗?
  9. 使用bootbox.js(二级务必提交书面和数字到数字中国)
  10. pcb钻孔披锋改善报告_【热点】大族激光:大客户下单积极,公司PCB业务有望延续增势...
  11. 触摸屏软键盘怎么调出来_触摸屏专用虚拟键盘下载
  12. JavaScript视频系统教程
  13. idea使用@Slf4j
  14. php微信调用摄像头拍视频,公众号调用摄像头录制视频
  15. 在网站的地址栏中的显示个性图标
  16. 阶段式软件研发项目管理工具
  17. ssm+redis整合(通过cache方式)
  18. ubuntu 18.04 桌面版应用、美化、配置备忘录[更新中]
  19. 只是为了好玩:linux之父林纳斯自传.pdf,读后感:《只是为了好玩:Linux之父林纳斯自传》...
  20. Til the Cows Come Home(简单的最短路)

热门文章

  1. 听见丨税改将刺激美国工厂增加更多机器人 日本神奈川县警方拟引进AI系统预测犯罪
  2. 【dede】安装完CMS源码后,会发现访问网站首页是报错 /templets/default/index.htm Not Found!
  3. 计算机类教育实验学院,【计算机专业论文】计算机专业实验教学革新路径探索(共3994字)...
  4. 请大家帮我找一下问题
  5. MuleSoft 4 配置连接数据库
  6. layui后台管理登录
  7. 华为企业级路由器配置与管理
  8. python的json格式输出_python中json格式数据输出实现方式
  9. MetaHuman结合第三人称角色,增加走路跑步动画
  10. 上海科技创新资源数据中心--免费下载知网、万方等平台论文