配置Kafka消息保留时间
生产者发送消息给kafka,消息被追加值日志文件并保留一定周期(基于配置)。本文探讨对Kafk主题配置消息保留时间。
基于时间保留
通过保留期属性,消息就有了TTL(time to live 生存时间)。到期后,消息被标记为删除,从而释放磁盘空间。对于kafka主题中所有消息具有相同的生存时间,但可以在创建主题之前设置属性,或对已存在的主题在运行时修改属性。
接下来我们将学习如何通过代理配置属性进行调整,以设置新主题的保留周期,并通过主题级配置在运行时控制它。
服务器级配置
Apache Kafka支持服务器级配置保留策略,我们可以通过配置以下三个基于时间的配置属性中的一个来进行优化:
- log.retention.hours
- log.retention.minutes
- log.retention.ms
注意:Kafka用更高精度值覆盖低精度值,所以log.retention.ms具有最高的优先级。
查看默认值
首先让我们检查保留时间的缺省值,在kafka目录下执行下面命令:
$ grep -i 'log.retention.[hms].*\=' config/server.properties
log.retention.hours=168
显示默认周期为7天。如果要设置消息保留周期为10分钟,可以通过config/server.properties配置文件的log.retention.minutes 属性进行配置。
log.retention.minutes=10
配置新主题
kafka提供了几个Shell脚本用于执行管理任务,利用它们创建工具脚本functions.sh。下面增加两个函数,分别为创建主题、展示配置:
function create_topic {topic_name="$1"bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \--partitions 1 --replication-factor 1 \--zookeeper localhost:2181
}function describe_topic_config {topic_name="$1"./bin/kafka-configs.sh --describe --all \--bootstrap-server=0.0.0.0:9092 \--topic ${topic_name}
}
现在创建两个独立脚本,create-topic.sh、get-topic-retention-time.sh:
bash-5.1# cat create-topic.sh
#!/bin/bash
../functions.sh
topic_name="$1"
create_topic "${topic_name}"
exit $?
bash-5.1# cat get-topic-retention-time.sh
#!/bin/bash
../functions.sh
topic_name="$1"
describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
exit $?
简单解释下脚本的特殊符号:
$?-表示上一个命令执行状态.
$0-当前脚本的文件名称.
$#-在脚本中使用参数,如$1,$2分别表示第一个参数和第二参数.
$$-当前脚本的进程号,就是当前执行脚本的进程ID.
需要说明的是:describe_topic_config列出给定主题的所有属性配置,因此必须使用awk进行过滤,找出retention.ms property属性值。
现在可以启动kafka环境并验证retention.ms property属性配置:
bash-5.1# ./create-topic.sh test-topic
Created topic test-topic.
bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=600000
通过脚本创建主题,列出描述,可以看到retention.ms 是 600000 (10分钟)。这是默认值,从之前设置server.properties文件中读出来的。
主题级配置
一旦kafka代理已经启动,log.retention.{hours|minutes|ms} 服务器级属性为只读属性。我们获得 retention.ms,但可以通过主题级参数进行调整。我们继续在functions.sh 脚本中增加方法配置主题属性:
function alter_topic_config {topic_name="$1"config_name="$2"config_value="$3"./bin/kafka-configs.sh --alter \--add-config ${config_name}=${config_value} \--bootstrap-server=0.0.0.0:9092 \--topic ${topic_name}
}
然后在alter-topic-config.sh 脚本使用它:
#!/bin/sh
../functions.shalter_topic_retention_config $1 $2 $3
exit $?
最后设置test-topic主题保存周期为5分钟,然后查看验证:
bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
Completed updating config for topic test-topic.bash-5.1# ./get-topic-retention-time.sh test-topic
retention.ms=300000
验证
我们已经配置kafka主题的消息保留周期。现在来验证在超时后消息确实过期。
生产-消费
在 functions.sh脚本增加两个produce_message 和 consume_message 函数,其内部分别使用kafka-console-producer.sh 和 kafka-console-consumer.sh,分别用于产生/消费消息:
function produce_message {topic_name="$1"message="$2"echo "${message}" | ./bin/kafka-console-producer.sh \--bootstrap-server=0.0.0.0:9092 \--topic ${topic_name}
}function consume_message {topic_name="$1"timeout="$2"./bin/kafka-console-consumer.sh \--bootstrap-server=0.0.0.0:9092 \--from-beginning \--topic ${topic_name} \--max-messages 1 \--timeout-ms $timeout
}
我们看到消费总是从头开始读消息,因为我们需要消费者读主题中任何有效的消息。
下面创建独立的生产者函数:
bash-5.1# cat producer.sh
#!/bin/sh
../functions.sh
topic_name="$1"
message="$2"produce_message ${topic_name} ${message}
exit $?
最后创建消费者函数:
bash-5.1# cat consumer.sh
#!/bin/sh
../functions.sh
topic_name="$1"
timeout="$2"consume_message ${topic_name} $timeout
exit $?
消息过期
我们已经准备了工具函数,开始产生单个消息,然后消费两次:
bash-5.1# ./producer.sh "test-topic-2" "message1"
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
bash-5.1# ./consumer.sh test-topic-2 10000
message1
Processed a total of 1 messages
我们看到消费者重复消费所有有效消息。现在引入延迟机制延迟5分钟,然后再次消费消息:
bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
[2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages
与我们期望一致,消费者没有发现任何消息,因为消息已经超过了它的保存周期。
限制
在Kafka Broker内部,维护另一个名为log.retention.check.interval.ms的属性,用于决定检查消息是否过期的频率。因此,为了保持保留策略的有效性,必须确保log.retention.check.interval.ms的值低于retention.ms 的属性值。对于任何给定的主题都一样。
总结
本文探索了Apache Kafka消息基于时间的保留策略。通过创建简单的shell脚本来简化管理过程,接着我们创建了独立的消费者和生产者,以验证在保留期之后消息的过期场景。
配置Kafka消息保留时间相关推荐
- kafka消息过期时间设置(全局和特定topic)
文章目录 一.kafka 全局消息过期时间设置 1. 配置文件夹 2. 修改配置 3. 重启配置生效 二.针对特定topic设置过期时间 2.1. 配置文件夹 2.2. 执行设置命令 三.kafka过 ...
- kafka日志保留时间设置无效问题
背景 看了网上很多文档,说是要设置log.retention.hour等等参数. 默认是保留7天,但我实测下来发现日志根本没有任何变化. 目前我们的kafka,一天就有400多个G,保留七天大大增加了 ...
- Flink SQL空闲状态保留时间(idle state retention time)实现原理
前言 如果要列举Flink SQL新手有可能犯的错误,笔者认为其中之一就是忘记设置空闲状态保留时间导致状态爆炸.2021年的第一篇技术文,时间很紧张,聊聊这个简单的话题吧. 为什么要设置 如果我们在数 ...
- kafka消息服务的producer、broker、consumer的配置
2019独角兽企业重金招聘Python工程师标准>>> server.properties配置: server.properties中所有配置参数说明(解释)如下列表: 参数 说明( ...
- 19 kafka消息队列
文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...
- RabbitMq(十) 消息过期时间TTL介绍以及代码实现
概述: 在rabbitmq中我们可以给消息设定过期时间LLT(Time to Live),在消息发送后过期时间段内未被消费,则系统会将其删除,被删除的消息将会进入死信队列.关于设置消息的过期时间有两种 ...
- mysql8 设置binlog的保留时间
在 my.ini 文件中添加配置,设定保留时间: [mysqld] binlog_expire_logs_seconds=259200 顾名思义:单位为秒,259200 = 3天 重启 mysql 服 ...
- Kafka配置消息保存时间的方法
为什么80%的码农都做不了架构师?>>> 配置参考 然后不废话,直接贴最终的关键配置: # 想实现消息队列中保存2小时的消息,那么配置应该像这样: log.roll.hours ...
- kafka中修改某个主题中数据的保留时间
我们都知道,kafka中默认消息的保留时间是7天,若想更改,需在配置文件 server.properties里更改选项: log.retention.hours=168 1 但是有的时候我们需要对某一 ...
最新文章
- 【Verilog语法】PC-relatve branch 以及 Delay Slot 的含义
- npm-卸载并安装指定版本Angular CLI以及报错(npm ERR! Error: EACCES: permission denied)
- leetcode 2 --- 两数相加
- 5月5日——更改手机状态栏的背景颜色
- schedule php,PHP Laravel定时任务Schedule【干货】
- js浏览器页面生命周期
- C/C++语言的学习策略
- python包管理工具ports_如何从自制程序,分发,macports,pip卸载所有软件包?
- 奇迹s6ep3服务器修改技术,奇迹S6EP3服务端之让天鹰及装备不掉持久的
- 学计算机应用的必懂知识,学习计算机应用基础心得体会
- 视频号直播与抖音快手直播有哪些区别呢?
- 计算机类ei和sci期刊,请教大家计算机领域数据挖掘方面有哪些比较好中的EI期刊和SCI期刊 - 论文投稿 - 小木虫 - 学术 科研 互动社区...
- 尚硅谷智慧校园-SpringBoot最佳入手级项目
- html的悬停图片圆形,css3炫酷圆形图片鼠标滑过特效
- python 学生信息管理系统(二)
- http://www.189qq.cn/soft?54563.htm
- 证明爱因斯坦错了!诺贝尔物理学奖今年颁给量子纠缠,潘建伟博士导师加冕...
- Gradle介绍1-入门和IDEA整合(Gradle Wrapper)
- docker的下载与安装
- 配置tomcat下载sis,sisx文件