logManager类:管理kafka数据log的类,包括数据clean,flush等操作

Log类:每个tplog的对象

logSegment:每个tplog目录下的文件对象

filemessageSet:每个log file的管道类

base offset:在topic中的绝对offset值

offsetindex:每个log index的管道map类,存储相对offset值和文件position

按照partition分区topic,分发到各个机子上

partition上有多个log文件,每个log文件一个索引文件

log文件是实际的数据,索引文件是log文件里数据的相对偏移量和在log文件里的position,偏移量offset是一段数据生成一个offset,避免offset文件过大

1.初始化:

val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"val LockFile = ".lock"val InitialTaskDelayMs = 30*1000private val logCreationOrDeletionLock = new Objectprivate val logs = new Pool[TopicAndPartition, Log]()//所有log的对象,一个topicpartition 一个log对象//获得log文件,并获得文件channel锁createAndValidateLogDirs(logDirs)private val dirLocks = lockLogDirs(logDirs)private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap//遍历所有的log,生成Log对象,并且执行log clean(checkposition)loadLogs()

主要方法loadLogs:

if (cleanShutdownFile.exists) {//表示上次关闭kafka时,已经clean完,这次不需要cleandebug("Found clean shutdown file. " +"Skipping recovery for all logs in data directory: " +dir.getAbsolutePath)} else {// log recovery itself is being performed by `Log` class during initializationbrokerState.newState(RecoveringFromUncleanShutdown)}//获得log下recover文件val recoveryPoints = this.recoveryPointCheckpoints(dir).readval jobsForDir = for {dirContent <- Option(dir.listFiles).toListlogDir <- dirContent if logDir.isDirectory} yield {Utils.runnable {debug("Loading log '" + logDir.getName + "'")//从文件目录上获得topic和partitionval topicPartition = Log.parseTopicPartitionName(logDir.getName)//从map中获得topic的自定义config,如果val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)val previous = this.logs.put(topicPartition, current)//判断是否有重复的topic+partitionif (previous != null) {throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(current.dir.getAbsolutePath, previous.dir.getAbsolutePath))}}}//对每个logDir执行 上边的runnable,生成Log对象添加到log pool中jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq

其中new Log方法,为初始化log file和index

主方法:loadSegments

1.处理swap文件,log则重新加载(rename),index则删除

2.加载log和index,恢复不存在的index

private def loadSegments() {// create the log directory if it doesn't existdir.mkdirs()// first do a pass through the files in the log directory and remove any temporary files // and complete any interrupted swap operationsfor(file <- dir.listFiles if file.isFile) {if(!file.canRead)throw new IOException("Could not read file " + file)val filename = file.getNameif(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {// if the file ends in .deleted or .cleaned, delete itfile.delete()} else if(filename.endsWith(SwapFileSuffix)) {//文件用于swap时候,恢复log// we crashed in the middle of a swap operation, to recover:// if a log, swap it in and delete the .index file// if an index just delete it, it will be rebuilt//如果是index则删除,如果是log则重新加载(重命名),并删除已经存在的indexval baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, ""))if(baseName.getPath.endsWith(IndexFileSuffix)) {file.delete()} else if(baseName.getPath.endsWith(LogFileSuffix)){// delete the indexval index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))index.delete()// complete the swap operationval renamed = file.renameTo(baseName)if(renamed)info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath))elsethrow new KafkaException("Failed to rename file %s.".format(file.getPath))}}}// now do a second pass and load all the .log and .index filesfor(file <- dir.listFiles if file.isFile) {val filename = file.getNameif(filename.endsWith(IndexFileSuffix)) {// if it is an index file, make sure it has a corresponding .log file 查看index log是否对应的 log,如果没有则删除val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))if(!logFile.exists) {warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))file.delete()}} else if(filename.endsWith(LogFileSuffix)) {// if its a log file, load the corresponding log segment// 文件名是start offsetval start = filename.substring(0, filename.length - LogFileSuffix.length).toLongval hasIndex = Log.indexFilename(dir, start).exists//建立tplog中 每个日志文件对象 logsegment,包含filemessage,offsetindex,baseoffset值val segment = new LogSegment(dir = dir, startOffset = start,indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize,rollJitterMs = config.randomSegmentJitter,time = time)if(!hasIndex) {error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))//重建index文件和内存索引,文件和内存索引是用的channel map机制segment.recover(config.maxMessageSize)}segments.put(start, segment)}}if(logSegments.size == 0) {// no existing segments, create a new mutable segment beginning at offset 0segments.put(0L, new LogSegment(dir = dir,startOffset = 0,indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize,rollJitterMs = config.randomSegmentJitter,time = time))} else {recoverLog()// reset the index size of the currently active log segment to allow more entriesactiveSegment.index.resize(config.maxIndexSize)}// sanity check the index file of every segment to ensure we don't proceed with a corrupt segmentfor (s <- logSegments)s.index.sanityCheck()}

-----------------------------初始化完毕---------------------------------

startup方法中三个功能:

1.cleanupLogs

2.flushDirtyLogs

3.checkpointRecoveryPointOffsets

1.cleanupLogs

两个方法一个是超时(超时是modify时间),一个是大小(大小是最老的小于diff)

  private def cleanupExpiredSegments(log: Log): Int = {val startMs = time.milliseconds//参数为log manager开始时间-tplog的修改时间 和 配置retention时间 比较,超过则需要删除,返回true//删除的是最后一次修改时间超过retention time的log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)}
  /*** 删除规则,是tplog超过阈值,从最老的开始找,找到file的大小小于diff的时候删除* 如果当前log file大小大于diff,则停止(原则是等最后一个文件可删除)*  Runs through the log removing segments until the size of the log*  is at least logRetentionSize bytes in size*/private def cleanupSegmentsToMaintainSize(log: Log): Int = {if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)return 0//当配置小于0,或log大小小于配置var diff = log.size - log.config.retentionSizedef shouldDelete(segment: LogSegment) = {if(diff - segment.size >= 0) {//如果需要删除的大小 大于或等于 logfile,则返回truediff -= segment.sizetrue} else {false}}log.deleteOldSegments(shouldDelete)}

参数:

清理日志,距离上次修改时间大于config时间,则删除

val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue))

log clean参数,达到log大小上限,log的position

val logRetentionBytes = props.getLong("log.retention.bytes", -1)

  def deleteOldSegments(predicate: LogSegment => Boolean): Int = {// find any segments that match the user-supplied predicate UNLESS it is the final segment // and it is empty (since we would just end up re-creating itval lastSegment = activeSegment//超时,并且包含segment,则删除,获得删除list segmentval deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))val numToDelete = deletable.sizeif(numToDelete > 0) {lock synchronized {// we must always have at least one segment, so if we are going to delete all the segments, create a new one firstif(segments.size == numToDelete)roll()// remove the segments for lookupsdeletable.foreach(deleteSegment(_))//从segment集合中移除,修改文件名称为delete结尾,并异步删除}}numToDelete}

2.flushDirtyLogs

flush的message条数和时间间隔/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */val logFlushIntervalMs = props.getLong("log.flush.interval.ms", logFlushSchedulerIntervalMs)/*** Flush any log which has exceeded its flush interval and has unwritten messages.*/private def flushDirtyLogs() = {debug("Checking for dirty logs to flush...")for ((topicAndPartition, log) <- logs) {try {val timeSinceLastFlush = time.milliseconds - log.lastFlushTimedebug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval  " + log.config.flushMs +" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)if(timeSinceLastFlush >= log.config.flushMs)log.flush} catch {case e: Throwable =>error("Error flushing topic " + topicAndPartition.topic, e)}}}@threadsafedef flush() {LogFlushStats.logFlushTimer.time {log.flush()index.flush()}}

3.checkpointRecoveryPointOffsets

checkpointRecoveryPointOffsets,标记logdir上的恢复点,避免启动时,需要恢复所有log,生成index

是按照logdir遍历,logdir中包含多个tplog

  /*** Make a checkpoint for all logs in provided directory.*/private def checkpointLogsInDir(dir: File): Unit = {//获得当前dir的所有tplog,value:Map【TopicAndPartition, Log】val recoveryPoints = this.logsByDir.get(dir.toString)if (recoveryPoints.isDefined) {//mapValues重新生成map的value,write参数(topicAndPartition:recoverPoint);//write将tplog的offset写入recover文件的tmp文件中,删除旧文件,rename为recover文件 _是Log对象(value)this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))}}

logmanager里实现log compact功能

    if(cleanerConfig.enableCleaner)cleaner.startup()//log compact

kafka logManager类 kafka存储机制相关推荐

  1. Kafka文件的存储机制

    Kafka文件的存储机制 同一个topic下有多个不同的partition,每个partition为一个目录,partition命名的规则是topic的名称加上一个序号,序号从0开始. 每一个part ...

  2. kafka存储机制与读写流程

    2019独角兽企业重金招聘Python工程师标准>>> 存储机制 ​ kafka用topic对消息进行归类,每一个topic可以分为多个分区,分区中的消息不重复,每个分区又有很多个s ...

  3. Kafka文件存储机制及offset存取

    Kafka是什么 Kafka是最初由Linkedin公司开发,是一个分布式.分区的.多副本的.多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx ...

  4. Kafka文件存储机制那些事

    Kafka是什么 Kafka是最初由Linkedin公司开发,是一个分布式.分区的.多副本的.多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx ...

  5. Kafka整体结构图、Consumer与topic关系、Kafka消息分发、Consumer的负载均衡、Kafka文件存储机制、Kafka partition segment等(来自学习资料)

    ##1. Kafka整体结构图 Kafka名词解释和工作方式  Producer : 消息生产者,就是向kafka broker发消息的客户端.  Consumer : 消息消费者,向kafka ...

  6. kafka工作流程及文件存储机制

    1.kafka工作流程 如图topicA有3个分区,每个分区有两个副本(包含一个leader,一个follower),发送消息可以一个一个的发送,也可以批量发送,0,1,2这种是offset偏移量,每 ...

  7. kafka 不同分区文件存储_Kafka 系列(二)文件存储机制与Producer架构原理怎样保证数据可靠性??...

    文章目录 Kafka工作流程及文件存储机制 工作流程: topic底层存储: Producer生产者架构: 一:分区存储策略: 1.分区的原因: 2.分区的原则: ProducerRecord构造器: ...

  8. 【kafka原理】kafka Log存储解析以及索引机制

    本文设置到的配置项有 名称 描述 类型 默认 num.partitions topic的默认分区数 int 1 log.dirs 保存日志数据的目录.如果未设置,则使用log.dir中的值 strin ...

  9. Kafka的存储机制以及可靠性

    Kafka的存储机制以及可靠性 一.kafka的存储机制 kafka通过topic来分主题存放数据,主题内有分区,分区可以有多个副本,分区的内部还细分为若干个segment. 所谓的分区其实就是在ka ...

最新文章

  1. WPF中对三维模型的控制
  2. 网站如何进行渠道跟踪_开发网站不知道如何进行推广?5个技巧教会你
  3. PHP 入门 - 10.应用技术
  4. 没有什么是日本牛郎店做不到的......
  5. HTML中的function函数命名,请问HTML function函数怎么定义和调用?
  6. JSON Stringify示例–如何使用JS解析JSON对象
  7. (十)java多线程之CountDownLatch
  8. nagios监控linux主机,Nagios监控平台之二:nrpe监控远程Linux主机 | 旺旺知识库
  9. 通过Server 2019中的组策略部署桌面墙纸 详解组策略环回处理
  10. “咕”了 73 天,何同学终于回归:最喜欢 3D 打印机,但不要买
  11. openwrt监控linux,openwrt linux portal 实现 支持 https 支持基于时长和流量控制
  12. Python实现汉译英
  13. ajax 传递请求参数
  14. 向App Store提交二进制文件时报错ERROR ITMS-90096
  15. 中超联赛首轮 山东鲁能一球小胜
  16. 如何在终端里面使用ping来查局域网活动ip
  17. 学习笔记59—收藏这7个在线配色神器,再也不愁配色灵感了
  18. C++ undefined symbol的问题分析和解决办法
  19. oracle中rollback命令,9.2.4 执行ROLLBACK命令
  20. fileupload实现多文件批量上传

热门文章

  1. Bezier曲线(附Python实现代码)
  2. osgEarth示例分析——osgearth_drawables
  3. 利用 74390 设计一个模 6 计数器,要求从 000 计数至 101,利用D触发器使其暂态的高电平清零信号延长而稳定
  4. 程序员鄙视链python_程序员的鄙视链,请勿对号入座
  5. UE4打包时出现 FMemoryWriter does not support data larger than 2GB 的解决方案
  6. pycharm原本正常打开的文件的图标上出现问号、无法打开
  7. android校园二手市场客户端+服务端源代码
  8. android机上没有usb调试模式吗,如何在Android上启用USB调试
  9. 网络段子爬虫程序加强版-Scrapy框架
  10. android 7比6,iPhone6已被下架 iPhone7的10大升级跟Android机比怎么样