问题描述,在Flink集群大数据处理过程中,向Kafka进行生产数据和消费数据;如果Flink处理过程中出现异常,采取相应的重启机制或设置检查点策略;项目启动后,随着设备接入越来越多,kafka的topic动态产生的也越来越多,Flink处理开始出现异常

java.io.IOException: Could not perform checkpoint 87 for operator Sink: Unnamed (34/90)#99.at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 87 for operator Sink: Unnamed (34/90)#99. Failure reason: Checkpoint was declined.at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)... 22 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 27 record(s) for topic-call_XAucjhIN-0:120000 ms has passed since batch creationat org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1429)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1117)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1014)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:102)at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:345)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1122)at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:219)... 33 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 27 record(s) for topic-call_XAucjhIN-0:120000 ms has passed since batch creation

Kafka集群中某一台服务器挂掉,报错信息如下:

[2022-08-01 14:55:22,453] ERROR Error while writing to checkpoint file /home/kafka-logs/fan_sink_29-1/leader-epoch-checkpoint (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /home/kafka-logs/topic_min/leader-epoch-checkpoint.tmp (打开的文件过多)at java.io.FileOutputStream.open0(Native Method)at java.io.FileOutputStream.open(FileOutputStream.java:270)at java.io.FileOutputStream.<init>(FileOutputStream.java:213)at java.io.FileOutputStream.<init>(FileOutputStream.java:162)at kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:94)at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:92)at kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:70)at kafka.server.epoch.LeaderEpochFileCache.flush(LeaderEpochFileCache.scala:292)at kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:61)at kafka.log.Log.$anonfun$maybeAssignEpochStartOffset$1(Log.scala:1368)at kafka.log.Log.$anonfun$maybeAssignEpochStartOffset$1$adapted(Log.scala:1367)at scala.Option.foreach(Option.scala:437)at kafka.log.Log.maybeAssignEpochStartOffset(Log.scala:1367)at kafka.cluster.Partition.$anonfun$makeLeader$1(Partition.scala:592)at kafka.cluster.Partition.makeLeader(Partition.scala:547)at kafka.server.ReplicaManager.$anonfun$makeLeaders$5(ReplicaManager.scala:1568)at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)at scala.collection.mutable.HashMap$Node.foreachEntry(HashMap.scala:633)at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:499)at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:1566)at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1411)at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:258)at kafka.server.KafkaApis.handle(KafkaApis.scala:171)at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)at java.lang.Thread.run(Thread.java:748)

处理方案如下:

//修改操作系统限制
[root@kafka101 ~] vi /etc/security/limits.conf

root soft nofile 65536
root hard nofile 65536

//查找包含kafka的目录或文件【定位kafka.service】

[root@kafka103 ~]# cd /

[root@kafka103 ~]# find / -name *kafka*

/etc/systemd/system/kafka.service

[root@kafka103 ~]# cd /etc/systemd/system/
//修改配置-增加读取文件大小

[root@kafka103 ~]# vi kafka.service

#增加最大文件数
LimitNOFILE=65535

[root@kafka103 ~]# systemctl daemon-reload

//重启kafka

[root@kafka103 ~]# systemctl stop kafka

[root@kafka103 ~]# systemctl start kafka

//查看kafka进程

[root@kafka103 system]# ps -ef|grep kafka
这里找到kafka进程号为19694

[root@kafka103 system]# cat /proc/19694/limits
Limit                     Soft Limit           Hard Limit           Units
Max cpu time              unlimited            unlimited            seconds
Max file size             unlimited            unlimited            bytes
Max data size             unlimited            unlimited            bytes
Max stack size            8388608              unlimited            bytes
Max core file size        0                    unlimited            bytes
Max resident set          unlimited            unlimited            bytes
Max processes             2062355              2062355              processes
Max open files            65535                65535                files
Max locked memory         65536                65536                bytes
Max address space         unlimited            unlimited            bytes
Max file locks            unlimited            unlimited            locks
Max pending signals       2062355              2062355              signals
Max msgqueue size         819200               819200               bytes
Max nice priority         0                    0
Max realtime priority     0                    0
Max realtime timeout      unlimited            unlimited           

Max Open Files  已经变为65535

至此"打开文件过多"问题已处理完毕

Error:KafkaStorageException打开的文件过多相关推荐

  1. Tomcat9.0.13 Bug引发的java.io.IOException:(打开的文件过多 Too many open files)导致服务假死...

    问题背景: 笔者所在的项目组最近把生产环境Tomcat迁移到Linux,算是顺利运行了一段时间,最近一个低概率密度的(too many open files)问题导致服务假死并停止响应客户端客户端请求 ...

  2. Caused by: java.nio.file.FileSystemException: ..... : 打开的文件过多

    在高并发处理图片的时候出现这个错误. 18:29:37.993 [pool-9-thread-8] ERROR cn.bywin.cbvsp.service.ImageSevice - 切图出错!! ...

  3. too many open files(打开的文件过多)解决方法

    too many open files(打开的文件过多)解决方法 参考文章: (1)too many open files(打开的文件过多)解决方法 (2)https://www.cnblogs.co ...

  4. linux:Too Many Open Files(打开的文件过多)

    前言 linux RH7 ulimit命令 lsof命令 /proc/{pid}/limits 了解linux:系统对open files的限制 今天发现某个程序发生了Too Many Open Fi ...

  5. 【问题分析】打开的文件过多

    [问题分析]打开的文件过多 背景 问题分析 测试 解决 背景 系统上线运行一段时间之后突然崩溃,重启后正常运行,过一段时间后再次崩溃.查看系统日志发现,原因是open too many files,打 ...

  6. java报文件打开数过多_Tomcat报java.io.IOException: 打开的文件过多

    今天后台服务器(Linux)tomcat应用报java.io.IOException: Too many open files 在网上查了一些资料 记录如下: 打开的文件过多,一般来说是由于应用程序对 ...

  7. java打开的文件过多_显示打开文件数目过多的解决方法.

    经常问的问题 如何从表中删除记录? 不要那样做 说真的 只需选择要保留的记录并替换原始表即可. 我想使我的O脚本既可执行又可加载,并将命令行参数传递给它. 在Linux-下使用以下shebang #! ...

  8. Qt提升部件后编译报错:ui_xxx.h:fatal error 无法打开包括文件:“xxx.h”

    ui_xxxxx.h(26): fatal error C1083: 无法打开包括文件: "xxx.h": No such file or directory,但是xxx.h这个自 ...

  9. 成功解决 _mssql.c(568): fatal error C1083: 无法打开包括文件: “sqlfront.h”: No such file or directory

    成功解决 _mssql.c(568): fatal error C1083: 无法打开包括文件: "sqlfront.h": No such file or directory 目 ...

最新文章

  1. break 和continue在循环中起到的作用
  2. mysql附件卸载_彻底卸载mysql
  3. mysql 多久备份一次_教你如何通过一次单击自动备份mysql数据库
  4. 史上最全!近千篇机器学习自然语言处理论文!都这儿了
  5. java vector list,Java基础之:List——ArrayList Vector
  6. 【报告分享】2020年母婴未来消费新趋势报告.pdf(附下载链接)
  7. jdbc ?占位符不起作用_JDBC高级(二):DbUtils
  8. χ² 分布到 F 分布到 ANOVA
  9. WQM软件使用说明书
  10. DOS命令大全:MS-DOS命令详解
  11. CTS2019朝圣 x 打铁记
  12. 利用高德api定位当前位置
  13. 前端经典面试题(持续更新)
  14. 3.9提取电话号的区号、电话号、分机号
  15. mybatis系列一:入门篇
  16. 伦敦国王学院计算机读研要求,伦敦国王学院高级计算机理学硕士
  17. 【bzoj3065】: 带插入区间K小值 详解——替罪羊套函数式线段树
  18. 【英语词组】恋恋不忘Day6-1
  19. VLC Media Player
  20. 解决捕获海康摄像头图像时报错:NET_DVR_GetLastError()= 4

热门文章

  1. iOS- APP如何做才安全
  2. Spring Cloud 学习笔记一 微服务架构
  3. web学习的有关书籍
  4. 怎么下载K-Flash烧录(有图 超详细)
  5. VFP6.0项目编写完之后如何生成安装程序?
  6. “智慧赟”平台型经济引领行业新标杆
  7. Axure RP9教程 常用函数
  8. 论看了别人抖音vbs表白后
  9. Build to win!——获得小黄衫的感想
  10. 电容值的读法[zz]