Redis复制积压缓冲区源码实现及其图解
文章目录
- 1.backlog是什么
- 2.backlog实现
- 2.1 backlog初始化
- 2.2 积累buffer
- 2.3 修改大小
- 3.图解backlog
1.backlog是什么
- 复制积压缓冲区(backlog)是保存在主节点固定长度的队列,默认1M,当主节点有连接的slave时创建,主节点响应写请求时,不但会将命令发给slave,也会写到自己backlog buffer中
- 缓冲区是先进先出队列,所以能实现保存最近已复制的数据,用来实现增量复制和复制断连的数据补偿。
- 原生的设计,当master从未连接过slave的时候不保存backlog,有slave后开始保存backlog buffer,当maste所有slave断开的时候,backlog就会释放,dcloud redis稍有不同,即:当master从未连接过slave的时候不保存backlog,有slave后开始保存backlog buffer,但是当maste所有slave断开的时候,backlog不会释放。
- 从内存占用上看,backlog buffer占用maxmemory的配置
上面虽然说得有些枯燥,但是都是干货了,字字都需要加粗那种,下面关于backlog的实现涉及一些源码,如果觉得更枯燥,看不懂,那就直接第三部分看图。
2.backlog实现
2.1 backlog初始化
backlog相关的变量存在Redis最主要的结构体redisServer
。
struct redisServer {......char *repl_backlog; /* Replication backlog for partial syncs */long long repl_backlog_size; /* Backlog circular buffer size */long long repl_backlog_histlen; /* Backlog actual data length */long long repl_backlog_idx; /* Backlog circular buffer current offset,that is the next byte will'll write to.*/long long repl_backlog_off; /* Replication "master offset" of firstbyte in the replication backlog buffer.*/......
}
挑我们用到的结构体变量说明一下。
- repl_backlog:对应INFO状态中:repl_backlog_active,只要repl_backlog非空,repl_backlog_active就等于1,也就意味着只要开始积累buffer,那么active就为1。
简化一点看:
char *server.repl_backlog = "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n";
printf("repl_backlog_active:%d\r\n",server.repl_backlog != NULL);
- repl_backlog_size:是设置的复制积压缓冲区总长度,对应INFO状态中:repl_backlog_size
- repl_backlog_histlen:是从初始化或者重置backlog到现在命令请求的总长度,对应INFO状态中:repl_backlog_histlen。
- repl_backlog_idx:是当前backlog存储命令请求最后一个字节index的位置,也就是下一次记录backlog从这一位开始写。
- repl_backlog_off:是当前backlog第一个字节的复制偏移量,个人认为理解为是保存backlog的偏移的最小值比较形象,对应INFO状态中:repl_backlog_first_byte_offset。
如下是刚启动redis实例的时候redis的info状态,
127.0.0.1:16379> info replication
# Replication
role:master
connected_slaves:0
master_replid:f08c90839d2c847a95864dcc92824720b765dfd7
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0 //此时还为0
repl_backlog_size:16384 //最大16kb
repl_backlog_first_byte_offset:0 //0
repl_backlog_histlen:0 //0
那么什么时候开始backlog buffer开始登场了呢?我们想一下既然backlog是为了复制重连和增量复制,那么至少要master有slave之后吧,所以当master接受slave发送的sync之后,syncCommand
会处理,只要master开始有一个slave并且当前的backlog为空,就会调用createReplicationBacklog
。
void syncCommand(client *c) {if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {createReplicationBacklog();}}
所以从现在开始backlog开辟内存,正式登场了。
void createReplicationBacklog(void) {serverAssert(server.repl_backlog == NULL);// 为repl_backlog开辟所需内存空间server.repl_backlog = zmalloc(server.repl_backlog_size);server.repl_backlog_histlen = 0;server.repl_backlog_idx = 0;/* We don't have any data inside our buffer, but virtually the first* byte we have is the next byte that will be generated for the* replication stream. */server.repl_backlog_off = server.master_repl_offset+1;
}
现在我们给master加个slave上去,先不写数据,看看此时info的状态
127.0.0.1:16379> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=x.x.x.x,port=6380,state=online,offset=0,lag=0
master_replid:630e21f2a038f946daabcd39465db9742358d1be
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:16384
repl_backlog_first_byte_offset:1
repl_backlog_histlen:0
可以看到repl_backlog_active已经为1(因为backlog非空了),下面我们开始写一些数据,看一下backlog怎么积攒的buffer。
2.2 积累buffer
当master上写了数据就将数据放入backlog并且发给slave,所以我们从replicationFeedSlaves
开始看,这个函数的主要作用就是补充backlog然后传播请求给slave,(这里我们不考虑slave有slave的这种“master”实例,只考虑单纯master)
那replicationFeedSlaves步骤到底是怎样的?简要概括下三个步骤:
- 步骤一:检查本次请求是不是切换了db,如果切换了db就积累select DB的backlog后传播select DB给slave
- 步骤二:将本次请求的参数个数转成协议格式"*参数数量\r\n",写到backlog
- 步骤三:根据命令的参数个数循环将命令写入backlog
下面是源码,供深入的同学参考,不想深入的跳过,直接往下看我挑出来的内容。
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {/* Send SELECT command to every slave if needed. *///dictid是本次命令的db,slaveseldb是上一次发给slave的db,所以只要切换了db,那么要先在backlog中写上当前的db,然后再发给slave,这样判断一下也避免如果一直不切换db,会一直写select到backlog和发给slaveif (server.slaveseldb != dictid) {robj *selectcmd;/* For a few DBs we have pre-computed SELECT command. */if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {selectcmd = shared.select[dictid];} else {int dictid_len;dictid_len = ll2string(llstr,sizeof(llstr),dictid);selectcmd = createObject(OBJ_STRING,sdscatprintf(sdsempty(),"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",dictid_len, llstr));}/* Add the SELECT command into the backlog. *///if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);/* Send it to slaves. */listRewind(slaves,&li);while((ln = listNext(&li))) {client *slave = ln->value;if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;addReply(slave,selectcmd);}if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)decrRefCount(selectcmd);}server.slaveseldb = dictid;//上面的selectcmd的backlog积累完也发给slave后,就要修改此时的lastSlaveSelectDb/* Write the command to the replication backlog if any. */if (server.repl_backlog) {char aux[LONG_STR_SIZE+3];/* Add the multi bulk reply length. */aux[0] = '*';len = ll2string(aux+1,sizeof(aux)-1,argc);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);for (j = 0; j < argc; j++) {long objlen = stringObjectLen(argv[j]);/* We need to feed the buffer with the object as a bulk reply* not just as a plain string, so create the $..CRLF payload len* and add the final CRLF */aux[0] = '$';len = ll2string(aux+1,sizeof(aux)-1,objlen);aux[len+1] = '\r';aux[len+2] = '\n';//写入当前参数长度feedReplicationBacklog(aux,len+3);//写入当前参数feedReplicationBacklogWithObject(argv[j]);//写入当前参数结尾固定的"\r\n"feedReplicationBacklog(aux+len+1,2);}}
我们把replicationFeedSlaves
中和backlog积累相关的代码挑出来
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {if(本次请求切换了db){调用feedReplicationBacklogWithObject将"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n"写入backlog;}if(backlog不为空){调用feedReplicationBacklog将每个命令的参数个数协议写进backlog,也就是写“*3\r\n”for (j = 0; j < 参数个数(3); j++) {//j=0将$5\r\nlpush\r\n写入调用feedReplicationBacklog写入"$5\r\n"调用feedReplicationBacklogWithObject写入"lpush"调用feedReplicationBacklog写入"\r\n"}}
}
上面看完了以后,又出现了两个函数feedReplicationBacklog
和feedReplicationBacklogWithObject
,实际上这两个函数是真正干活,积累backlog buffer的,其中feedReplicationBacklogWithObject
也是调用feedReplicationBacklog
,只不过前者多处理一步将longlong类型的命令转为string后交给feedReplicationBacklog
处理,那么接下来看一下feedReplicationBacklog
。
feedReplicationBacklog
函数的功能就是将传参长度len与其对应字符串ptr,写到master_repl_offset,然后写到backlog。
首先还是贴源码,不想看的跳过这一块code,看我挑出来的内容。
void feedReplicationBacklog(void *ptr, size_t len) {unsigned char *p = ptr;server.master_repl_offset += len;/* This is a circular buffer, so write as much data we can at every* iteration and rewind the "idx" index if we reach the limit. */while(len) {size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;if (thislen > len) thislen = len;memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);server.repl_backlog_idx += thislen;if (server.repl_backlog_idx == server.repl_backlog_size)server.repl_backlog_idx = 0;len -= thislen;p += thislen;server.repl_backlog_histlen += thislen;}if (server.repl_backlog_histlen > server.repl_backlog_size)server.repl_backlog_histlen = server.repl_backlog_size;/* Set the offset of the first byte we have in the backlog. */server.repl_backlog_off = server.master_repl_offset -server.repl_backlog_histlen + 1;
}
feedReplicationBacklog
函数对backlog的补充规则就是,如果写入的时候,backlog中剩下的空间还够,那么就写入len的长度到backlog,如果本次写入的时候,backlog所剩长度不足本次字符长度,那么先把剩余backlog的长度用字符串补充完整,然后再while(len)循环一次,把剩下字符串的补上。
void feedReplicationBacklog(void *ptr, size_t len) {复制偏移量=复制偏移量+len;while(len){//thislen:容量允许写入长度(thislen = 总大小 - 索引idx;//thislen:本次写入长度if(容量允许写入长度 > 指定写入长度) thislen = len;//将本次写入长度累积到backlog将thislen写入长度累积到backlog;索引idx = 索引idx + 本次写入长度;if(索引idx = backlog最大值) 索引idx = 0;len = len - 本次写入长度;repl_backlog_histlen = repl_backlog_histlen + 本次写入长度;}if(backlog_histlen > backlog总大小) backlog_histlen = backlog总大小;backlog第一个字节的位置 = 复制偏移量 - 当前backlog大小 + 1
}
上面说完了backlog的积累,和backlog大小相关的处理还剩下backlog大小的调整,所以接下来看一下修改backlog大小。
2.3 修改大小
我们连接上redis-cli,调整大小,然后info查一下。
127.0.0.1:6380> config set repl-backlog-size 256mb
OK
127.0.0.1:6380> config get repl-backlog-size
1) "repl-backlog-size"
2) "268435456"
127.0.0.1:6380> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6381,state=online,offset=99,lag=1
master_replid:4fad80986b1e0b8b839b35566772b8d6527bdde7
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:99
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:268435456
repl_backlog_first_byte_offset:100
repl_backlog_histlen:0
info查询完发现repl_backlog_histlen
为0了,也就是我们修改backlog之后,这块内存空间被重置了,那么这地方到底是如何处理的?
修改复制积压缓冲区的函数为resizeReplicationBacklog
,调用传参为修改的新值
void resizeReplicationBacklog(long long newsize) {// 如果设置的新值小于16K,newsize=16K,也就是最小值都要是16Kif (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;// 如果设置的新值等于当前值那么不需要处理,直接返回if (server.repl_backlog_size == newsize) return;//repl_backlog_size赋值大小为新的长度server.repl_backlog_size = newsize;if (server.repl_backlog != NULL) {/* What we actually do is to flush the old buffer and realloc a new* empty one. It will refill with new data incrementally.* The reason is that copying a few gigabytes adds latency and even* worse often we need to alloc additional space before freeing the* old buffer. 这段官方的注释是对调整大小的方式的解释,处理方式为清除老的backlog缓冲的内容,重新申请一片未使用的空间,之所以这么做的原因有两个考虑,首先拷贝几个G的backlog到新的空间会造成延迟,其次更糟糕的是在释放旧空间之前先要申请新的内存(这就要占用两个backlog大小的内存)*/// buffer直接释放掉zfree(server.repl_backlog);//分配新的大小空间的内存给backlogserver.repl_backlog = zmalloc(server.repl_backlog_size);//积累的buffer即repl_backlog_histlen=0,server.repl_backlog_histlen = 0;server.repl_backlog_idx = 0;/* Next byte we have is... the next since the buffer is empty. */// 这里要注意缓冲区第一位是当前复制offset+1,而不是0// 因为增量复制是使用偏移量的,对比的也是repl_backlog_offserver.repl_backlog_off = server.master_repl_offset+1;}
}
3.图解backlog
上面的代码看过之后,如果还没有看懂,我将backlog积累过程画出来,试试看,看完会不会更理解一些。
- 当master没有slave的时候,是不会开辟backlog内存空间的,所以我们看有slave的时候master的backlog:
- 接下来master上开始写一些数据,backlog累积了select 0和set a b,如下图:
[解释]: backlog 是数组,而不要因为上图误解为链表,是因为篇幅放不下,我才把图片才分成两行显示
- 写了一段时间,数据越来越多,直到数据快写满backlog最大值了,这时候’$9\r\n’已经写完了,马上写对应的key:‘MilkCandy’,这时候发现backlog-buffer只剩下8个字符空间了。
- 此时就会将’MilkCandy’分成’MilkCand’和’y’两次写入
至此backlog第一次被填满,以后的每一次都是这样处理的,上面的举例是当master没有写数据的时候添加slave,还有加入slave之前master_repl_offset就已经很大的情况,以及backlog调整大小导致buffer被重置之后的情况,大家也可以试着画画。
Redis复制积压缓冲区源码实现及其图解相关推荐
- 【作者面对面问答】包邮送《Redis 5设计与源码分析》5本
墨墨导读:本文节选自<Redis 5设计与源码分析>,主要为读者分析Redis高性能内幕,重点从源码层次讲解了Redis事件模型,网络IO事件重在使用IO复用模型,时间事件重在限制最大执行 ...
- 选redis还是memcache,源码怎么说?
选redis还是memcache,源码怎么说? memcache和redis是互联网分层架构中,最常用的KV缓存.不少同学在选型的时候会纠结,到底是选择memcache还是redis. 画外音:不鼓励 ...
- 霸榜巨作、阿里内部顶级专家整理(Redis 5设计与源码分析)
前言 在开源界,高性能服务的典型代表就是Nginx和Redis.纵观这两个软件的源码,都是非常简洁高效的,也都是基于异步网络I/O机制的,所以对于要学习高性能服务的程序员或者爱好者来说,研究这两个网络 ...
- 新书推荐 |《Redis 5设计与源码分析》
新书推荐 <Redis 5设计与源码分析> 点击上图了解及购买 好未来.滴滴.百度等公司专家联合撰写,掌握Redis 5设计与命令实现,透彻掌握分布式缓存. 编辑推荐 多名专家联袂推荐,资 ...
- Redis分布式锁解析源码分析
Redis分布式锁解析&源码分析 概述 实战 简单的分布式锁 Redisson实现分布式锁 Redission源码分析 构造方法 获取锁lock 解锁 锁失效 红锁 案例分析 原始的写法 进化 ...
- redis 4.0.8 源码包安装集群
系统:centos 6.9 软件版本:redis-4.0.8,rubygems-2.7.7,gcc version 4.4.7 20120313,openssl-1.1.0h,zlib-1.2.11 ...
- linux 循环缓冲区 源码,Linux中的循环缓冲区
在学习到 并发和竞态 时,其中的提到了缓冲区,用于实现免锁算法,这里转载的是大神有关循环缓冲区做的一些操作. 其中源代码在最下面的附件中,有关作者的讲解感觉很清晰,很好,不过这里说一下自己的见解: 点 ...
- Redis 2.8.9源码 - Redis中的字符串实现 sds
2019独角兽企业重金招聘Python工程师标准>>> 本文为作者原创,转载请注明出处:http://my.oschina.net/fuckphp/blog/269167 在C中子字 ...
- Redis集群模式源码分析
目录 1 主从复制模式 2 Sentinel(哨兵)模式 3 Cluster模式 4.参考文档 1 主从复制模式 主库负责读写操作,从库负责数据同步,接受来自主库的同步命令.通过分析Redis的客户端 ...
最新文章
- 人的一生有三件事不能等
- 独家 | 用XGBoost入门可解释机器学习
- ORA-600 [kddummy_blkchk] [18038] 一例
- ADT,Eclipse启动时在Android SDK Content Loader0%无法编译
- JVM -- 运行时栈帧结构简介
- 2021-9-下旬 数据结构-线性表-队列-java代码实现(复习用)
- C# 视频监控系列(14):总结贴——VC++代码转成C#小结
- 济源一中2021高考成绩查询入口,济源一中2019高考成绩喜报、一本二本上线人数情况...
- 原创:协同过滤之ALS
- 东芝MCU实现位带操作
- 泛函、变分与欧拉-拉格朗日方程
- 《此生未完成读书总结》
- 【开发日常】什么是标准?什么是协议?标准和协议之间是什么关系?
- 奥升德推出Acteev Protect™抗菌技术
- 汪涵曾因太穷被前妻离婚,杨乐乐因一个动作就把初恋男友甩了
- 终于,字节跳动要取消大小周了,我 1.7 万人的票圈都快炸了!
- Now trying to drop the old temporary tablespace, the session hangs.
- 把int转换为char把int转换为char
- Linux ubuntu 服务器部署详细教程
- iWO(联通3G详单及套餐使用情况查询工具)更新至v0.7