文章目录

  • Log文件组织方式
  • Log文件的写入
    • AddRecord
    • EmitPhysicalRecord
  • Log文件的读取
    • SkipToInitialBlock
    • ReadPhysicalRecord
    • ReadRecord

Log文件组织方式

前文《leveldb源码解析系列—整体架构》中提到了Log文件,本篇对Log文件的组织方式进行解析

leveldb/doc/log_format.md中介绍了Log文件的组织方式

The log file contents are a sequence of 32KB blocks. The only exception is that the tail of the file may contain a partial block.

Each block consists of a sequence of records

Log文件是由一系列32KB的block组成,一个block包含一个或多个record,record的组织方式如下:

record :=checksum: uint32     // crc32c of type and data[] ; little-endianlength: uint16       // little-endiantype: uint8          // One of FULL, FIRST, MIDDLE, LASTdata: uint8[length]

可以看到,record有四种类型,因为record是变长的,所以需要通过type将它们进行区分:

The FULL record contains the contents of an entire user record.

FIRST, MIDDLE, LAST are types used for user records that have been split into multiple fragments (typically because of block boundaries). FIRST is the type of the first fragment of a user record, LAST is the type of the last fragment of
a user record, and MIDDLE is the type of all interior fragments of a user record.

关于Log文件组织方式更详细的介绍可参考doc/log_format.md,下面就Log文件的写入和读取进行解析

Log文件的写入

关于log文件写入的相关代码位于db/log_writer.hdb/log_writer.cc

class Writer {public:// Create a writer that will append data to "*dest".// "*dest" must be initially empty.// "*dest" must remain live while this Writer is in use.explicit Writer(WritableFile* dest);// Create a writer that will append data to "*dest".// "*dest" must have initial length "dest_length".// "*dest" must remain live while this Writer is in use.Writer(WritableFile* dest, uint64_t dest_length);Writer(const Writer&) = delete;Writer& operator=(const Writer&) = delete;~Writer();Status AddRecord(const Slice& slice);private:Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);WritableFile* dest_;int block_offset_;  // Current offset in block// crc32c values for all supported record types.  These are// pre-computed to reduce the overhead of computing the crc of the// record type stored in the header.uint32_t type_crc_[kMaxRecordType + 1];
};

可以看到主要的类为leveldb::log::Writer,主要包含dest_block_offset_两个成员变量,和AddRecord成员函数,其中leveldb::log::Writer使用到了leveldb::WritableFile,关于WritableFile暂不分析

leveldb::log::Writer的每一个构造函数中都调用了静态函数InitTypeCrc来初始化type_crc_,下面着重分析一下AddReocrEmitPhysicalRecord函数

AddRecord

Status Writer::AddRecord(const Slice& slice) {const char* ptr = slice.data();size_t left = slice.size();// Fragment the record if necessary and emit it.  Note that if slice// is empty, we still want to iterate once to emit a single// zero-length recordStatus s;bool begin = true;do {const int leftover = kBlockSize - block_offset_;assert(leftover >= 0);if (leftover < kHeaderSize) {// Switch to a new blockif (leftover > 0) {// Fill the trailer (literal below relies on kHeaderSize being 7)static_assert(kHeaderSize == 7, "");dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));}block_offset_ = 0;}// Invariant: we never leave < kHeaderSize bytes in a block.assert(kBlockSize - block_offset_ - kHeaderSize >= 0);const size_t avail = kBlockSize - block_offset_ - kHeaderSize;const size_t fragment_length = (left < avail) ? left : avail;RecordType type;const bool end = (left == fragment_length);if (begin && end) {type = kFullType;} else if (begin) {type = kFirstType;} else if (end) {type = kLastType;} else {type = kMiddleType;}s = EmitPhysicalRecord(type, ptr, fragment_length);ptr += fragment_length;left -= fragment_length;begin = false;} while (s.ok() && left > 0);return s;
}
  1. 获取slice数据和长度,赋值给ptr和left
  2. 循环,直到数据全部写入
    1. 计算当前块内剩余容量,如果小于kHeaderSize,则插入x00,并且置block_offset_ = 0
    2. 计算除kHeaderSize外剩余容量,并且与left比较取较小值,判断是否到达end
    3. 根据beginend判断类型
    4. 执行EmitPhysicalRecord写到log文件中
    5. 增加ptr并且减少left

EmitPhysicalRecord

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,size_t length) {assert(length <= 0xffff);  // Must fit in two bytesassert(block_offset_ + kHeaderSize + length <= kBlockSize);// Format the headerchar buf[kHeaderSize];buf[4] = static_cast<char>(length & 0xff);buf[5] = static_cast<char>(length >> 8);buf[6] = static_cast<char>(t);// Compute the crc of the record type and the payload.uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);crc = crc32c::Mask(crc);  // Adjust for storageEncodeFixed32(buf, crc);// Write the header and the payloadStatus s = dest_->Append(Slice(buf, kHeaderSize));if (s.ok()) {s = dest_->Append(Slice(ptr, length));if (s.ok()) {s = dest_->Flush();}}block_offset_ += kHeaderSize + length;return s;
}

执行的操作就是构造Header,调用WritableFileappendflush,将record刷到物理磁盘上

Log文件的读取

class Reader {public:// Interface for reporting errors.class Reporter {public:virtual ~Reporter();// Some corruption was detected.  "bytes" is the approximate number// of bytes dropped due to the corruption.virtual void Corruption(size_t bytes, const Status& status) = 0;};// Create a reader that will return log records from "*file".// "*file" must remain live while this Reader is in use.//// If "reporter" is non-null, it is notified whenever some data is// dropped due to a detected corruption.  "*reporter" must remain// live while this Reader is in use.//// If "checksum" is true, verify checksums if available.//// The Reader will start reading at the first record located at physical// position >= initial_offset within the file.Reader(SequentialFile* file, Reporter* reporter, bool checksum,uint64_t initial_offset);Reader(const Reader&) = delete;Reader& operator=(const Reader&) = delete;~Reader();// Read the next record into *record.  Returns true if read// successfully, false if we hit end of the input.  May use// "*scratch" as temporary storage.  The contents filled in *record// will only be valid until the next mutating operation on this// reader or the next mutation to *scratch.bool ReadRecord(Slice* record, std::string* scratch);// Returns the physical offset of the last record returned by ReadRecord.//// Undefined before the first call to ReadRecord.uint64_t LastRecordOffset();private:// Extend record types with the following special valuesenum {kEof = kMaxRecordType + 1,// Returned whenever we find an invalid physical record.// Currently there are three situations in which this happens:// * The record has an invalid CRC (ReadPhysicalRecord reports a drop)// * The record is a 0-length record (No drop is reported)// * The record is below constructor's initial_offset (No drop is reported)kBadRecord = kMaxRecordType + 2};// Skips all blocks that are completely before "initial_offset_".//// Returns true on success. Handles reporting.bool SkipToInitialBlock();// Return type, or one of the preceding special valuesunsigned int ReadPhysicalRecord(Slice* result);// Reports dropped bytes to the reporter.// buffer_ must be updated to remove the dropped bytes prior to invocation.void ReportCorruption(uint64_t bytes, const char* reason);void ReportDrop(uint64_t bytes, const Status& reason);SequentialFile* const file_;Reporter* const reporter_;bool const checksum_;char* const backing_store_;Slice buffer_;bool eof_;  // Last Read() indicated EOF by returning < kBlockSize// Offset of the last record returned by ReadRecord.uint64_t last_record_offset_;// Offset of the first location past the end of buffer_.uint64_t end_of_buffer_offset_;// Offset at which to start looking for the first record to returnuint64_t const initial_offset_;// True if we are resynchronizing after a seek (initial_offset_ > 0). In// particular, a run of kMiddleType and kLastType records can be silently// skipped in this modebool resyncing_;
};

leveldb::log::Reader中有几个成员变量需要解释

char* const backing_store_          //实际存储Slice数据的指针
Slice buffer_;                      //读取物理record时的内存buffer,如果不遇到eof则是一个kBlockSize
uint64_t last_record_offset_;       //Offset of the last record returned by ReadRecord.
uint64_t end_of_buffer_offset_;     //当前的读取偏移
uint64_t const initial_offset_;     //Offset at which to start looking for the first record to return
bool resyncing_;

SkipToInitialBlock

bool Reader::SkipToInitialBlock() {const size_t offset_in_block = initial_offset_ % kBlockSize;uint64_t block_start_location = initial_offset_ - offset_in_block;// Don't search a block if we'd be in the trailerif (offset_in_block > kBlockSize - 6) {block_start_location += kBlockSize;}end_of_buffer_offset_ = block_start_location;// Skip to start of first block that can contain the initial recordif (block_start_location > 0) {Status skip_status = file_->Skip(block_start_location);if (!skip_status.ok()) {ReportDrop(block_start_location, skip_status);return false;}}return true;
}

根据initial_offset_执行调整和跳转

  1. 计算块内偏移offset_in_block和起始块地址block_start_location
  2. 如果块内偏移位于trailer,起始块地址跳到下一个块
  3. end_of_buffer_offset = block_start_location
  4. file_->Skip(block_start_location)

ReadPhysicalRecord

unsigned int Reader::ReadPhysicalRecord(Slice* result) {while (true) {if (buffer_.size() < kHeaderSize) {if (!eof_) {// Last read was a full read, so this is a trailer to skipbuffer_.clear();Status status = file_->Read(kBlockSize, &buffer_, backing_store_);end_of_buffer_offset_ += buffer_.size();if (!status.ok()) {buffer_.clear();ReportDrop(kBlockSize, status);eof_ = true;return kEof;} else if (buffer_.size() < kBlockSize) {eof_ = true;}continue;} else {// Note that if buffer_ is non-empty, we have a truncated header at the// end of the file, which can be caused by the writer crashing in the// middle of writing the header. Instead of considering this an error,// just report EOF.buffer_.clear();return kEof;}}// Parse the headerconst char* header = buffer_.data();const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;const unsigned int type = header[6];const uint32_t length = a | (b << 8);if (kHeaderSize + length > buffer_.size()) {size_t drop_size = buffer_.size();buffer_.clear();if (!eof_) {ReportCorruption(drop_size, "bad record length");return kBadRecord;}// If the end of the file has been reached without reading |length| bytes// of payload, assume the writer died in the middle of writing the record.// Don't report a corruption.return kEof;}if (type == kZeroType && length == 0) {// Skip zero length record without reporting any drops since// such records are produced by the mmap based writing code in// env_posix.cc that preallocates file regions.buffer_.clear();return kBadRecord;}// Check crcif (checksum_) {uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);if (actual_crc != expected_crc) {// Drop the rest of the buffer since "length" itself may have// been corrupted and if we trust it, we could find some// fragment of a real log record that just happens to look// like a valid log record.size_t drop_size = buffer_.size();buffer_.clear();ReportCorruption(drop_size, "checksum mismatch");return kBadRecord;}}buffer_.remove_prefix(kHeaderSize + length);// Skip physical record that started before initial_offset_if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <initial_offset_) {result->clear();return kBadRecord;}*result = Slice(header + kHeaderSize, length);return type;}
}

读取下一个物理record

  1. 读取block数据到buffer_中,如果读取到的buffer_ < kBlockSize,说明读到了文件末尾,重新进入下一次循环后,如果buffer_.size() < kHeaderSize说明header被截断了,返回kEof。如果解析header后发现kHeaderSize + length > buffer_.size(),说明数据不完整,返回kEof
  2. 如果读取到了zero length record,返回kBadRecord
  3. 检查crc
  4. Skip physical record that started before initial_offset_
  5. 构造result

ReadRecord

bool Reader::ReadRecord(Slice* record, std::string* scratch) {if (last_record_offset_ < initial_offset_) {if (!SkipToInitialBlock()) {return false;}}scratch->clear();record->clear();bool in_fragmented_record = false;// Record offset of the logical record that we're reading// 0 is a dummy value to make compilers happyuint64_t prospective_record_offset = 0;Slice fragment;while (true) {const unsigned int record_type = ReadPhysicalRecord(&fragment);// ReadPhysicalRecord may have only had an empty trailer remaining in its// internal buffer. Calculate the offset of the next physical record now// that it has returned, properly accounting for its header size.uint64_t physical_record_offset =end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();if (resyncing_) {if (record_type == kMiddleType) {continue;} else if (record_type == kLastType) {resyncing_ = false;continue;} else {resyncing_ = false;}}switch (record_type) {case kFullType:if (in_fragmented_record) {// Handle bug in earlier versions of log::Writer where// it could emit an empty kFirstType record at the tail end// of a block followed by a kFullType or kFirstType record// at the beginning of the next block.if (!scratch->empty()) {ReportCorruption(scratch->size(), "partial record without end(1)");}}prospective_record_offset = physical_record_offset;scratch->clear();*record = fragment;last_record_offset_ = prospective_record_offset;return true;case kFirstType:if (in_fragmented_record) {// Handle bug in earlier versions of log::Writer where// it could emit an empty kFirstType record at the tail end// of a block followed by a kFullType or kFirstType record// at the beginning of the next block.if (!scratch->empty()) {ReportCorruption(scratch->size(), "partial record without end(2)");}}prospective_record_offset = physical_record_offset;scratch->assign(fragment.data(), fragment.size());in_fragmented_record = true;break;case kMiddleType:if (!in_fragmented_record) {ReportCorruption(fragment.size(),"missing start of fragmented record(1)");} else {scratch->append(fragment.data(), fragment.size());}break;case kLastType:if (!in_fragmented_record) {ReportCorruption(fragment.size(),"missing start of fragmented record(2)");} else {scratch->append(fragment.data(), fragment.size());*record = Slice(*scratch);last_record_offset_ = prospective_record_offset;return true;}break;case kEof:if (in_fragmented_record) {// This can be caused by the writer dying immediately after// writing a physical record but before completing the next; don't// treat it as a corruption, just ignore the entire logical record.scratch->clear();}return false;case kBadRecord:if (in_fragmented_record) {ReportCorruption(scratch->size(), "error in middle of record");in_fragmented_record = false;scratch->clear();}break;default: {char buf[40];std::snprintf(buf, sizeof(buf), "unknown record type %u", record_type);ReportCorruption((fragment.size() + (in_fragmented_record ? scratch->size() : 0)),buf);in_fragmented_record = false;scratch->clear();break;}}}return false;
}

读取下一个逻辑record

  1. 根据initial_offset_跳转到调用者指定的位置,开始读取日志文件。跳转就是调用SequentialFile的Seek接口
  2. 初始化in_fragmented_record = false,如果遇到kFirstType,这个标志置为true,记录该逻辑record是碎片组成的,分布在多个block内
  3. 初始化prospective_record_offset = 0,prospective_record_offset是逻辑record的偏移地址
  4. 执行循环,直到读取到所有的物理record
    1. 读取下一个物理record
    2. 判断record_type,如果是kFullType,更新last_record_offset_并返回;如果是kFirstType,更新scratch,并且置in_fragmented_record为true;如果是kMiddleType,先检查in_fragmented_record是否为true,如果不为true,report “missing start of fragmented record(1)”,否则更新scratch;如果是kLastType,同样检查in_fragmented_record,更新scratch并赋值给record,记录last_record_offset_,然后返回;如果是其它类型,会做一些错误处理
    3. 注意在record_type为kFullType和kFirstType时,同样也会检查in_fragmented_record,这是对leveldb早期版本的work around,leveldb早期是可以emit an empty kFirstType record at the tail end of a block的,但是这里并未做一些处理,而是直接ReportCorruption

leveldb源码解析系列—Log相关推荐

  1. TiKV 源码解析系列文章(二)raft-rs proposal 示例情景分析

    作者:屈鹏 本文为 TiKV 源码解析系列的第二篇,按照计划首先将为大家介绍 TiKV 依赖的周边库 raft-rs .raft-rs 是 Raft 算法的 Rust 语言实现.Raft 是分布式领域 ...

  2. Tomcat源码解析系列二:Tomcat总体架构

    Tomcat即是一个HTTP服务器,也是一个servlet容器,主要目的就是包装servlet,并对请求响应相应的servlet,纯servlet的web应用似乎很好理解Tomcat是如何装载serv ...

  3. TiKV 源码解析系列 - Raft 的优化

    这篇文章转载TiDB大牛 唐刘 的博客:https://mp.weixin.qq.com/s?__biz=MzI3NDIxNTQyOQ==&mid=2247484544&idx=1&a ...

  4. openGauss数据库源码解析系列文章--openGauss简介(一)

    openGauss数据库是华为深度融合在数据库领域多年经验,结合企业级场景要求推出的新一代企业级开源数据库.此前,Gauss松鼠会已经发布了openGauss数据库核心技术系列文章,介绍了openGa ...

  5. Redux 源码解析系列(一) -- Redux的实现思想

    文章来源: IMweb前端社区 黄qiong(imweb.io) IMweb团队正在招聘啦,简历发至jayccchen@tencent.com Redux 其实是用来帮我们管理状态的一个框架,它暴露给 ...

  6. prometheus变量_TiKV 源码解析系列文章(四)Prometheus(下)

    本文为 TiKV 源码解析系列的第四篇,接上篇继续为大家介绍 rust-prometheus.上篇主要介绍了基础知识以及最基本的几个指标的内部工作机制,本篇会进一步介绍更多高级功能的实现原理. 与上篇 ...

  7. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

  8. Mybatis3 源码解析系列

    简介 Mybatis作为一个优秀的Java持久化框架,在我们的日常工作中相信都会用到,本次源码解析系列,就开始探索下Mybatis 总结 在MyBatis的学习中,首先通读了<MyBatis3源 ...

  9. openGauss数据库源码解析系列文章——openGauss开发快速入门(二)

    在上一篇openGauss数据库源码解析系列文章--openGauss开发快速入门(上)中,我们介绍了openGauss的安装部署方法,本篇将具体介绍openGauss基本使用. 二. openGau ...

最新文章

  1. 利用gearman实现redis缓存mysql
  2. AcWing算法提高课 Level-3 第三章 图论
  3. java与jquery的选择器区别_JQuery选择器
  4. Java判断一组数字是否是等差数列
  5. pip install python -32_pip安装python模块方法
  6. 充分利用Microsoft Planner的6种方法
  7. 判断一个字符串是否包含另一个字符串(用java但是不能用index()这个函数)
  8. 一堂如何提高代码质量的培训课 之 领域驱动设计
  9. POJ 2240 Arbitrage Bellman_ford 判读是否存在正环
  10. Java 中的volitle 关键字
  11. 实操教程:Android部署Nanodet模型完成实时高效的物体检测
  12. 三菱st语言编译后报C9330,三菱plc编程用什么语言比较好?三菱编程语言的特点...
  13. xml文件导入wps_Office12使用XML格式存储文件回击WPS
  14. 基于51单片机的电子称称重压力检测阈值报警系统方案原理图设计
  15. 记一次python cpu100%分析记录
  16. 数据显示:中国的程序员是世界上最牛的程序员
  17. 金色经典图案背景新中式PPT模板
  18. C语言基础ask‖码一些知识
  19. android 6.0 7.0,Android 6.0/7.0可升级机型名单出炉
  20. FF新推荐新闻资讯怎么关闭删除教程。

热门文章

  1. 压敏电阻的特性与应用
  2. 从事主机行业必看:关于MF行业的一点个人观察和体会(转载)
  3. 排序不等式 AcWing 913. 排队打水
  4. C语言计算单位阶跃函数
  5. asp.net of BF-TECH软件工程师IT高端培训课程
  6. Java8 Stream 常用简书
  7. 2021年11月数据库排行解读:openGauss跃居第三,人大金仓晋身前十
  8. WPF中为button按钮设置CornerRadius圆角
  9. 机器学习中的偏差和方差
  10. Azure数据仓库表中的数据经常使用的三种分布策略(hash、round_robin 或 replicated)简介