DCache-CacheServer分析(六)
DCache支持定期将内存“脏”数据自动写入后端数据库,而且不需要编写任何一行代码。
本文介绍CacheServer回写线程(SyncThread)的功能以及处理流程。
SyncThread
功能简介
定时回写脏数据到后端db,同步“回写时间”到备机。
这是DCache的亮点功能。DChace会将未持久化的数据(注意!这里指的是未持久化到后端db)的Key的地址插入“Dirty链表”,由该线程轮询处理,将脏数据传送给对应的DbAcess服务写后端db;写入成功后,将该数据从Dirty链中删除。
这个功能只需要进行几步简单的配置就可实现,不需要开发任何代码。
相关文件:
SyncThread.h
SyncThread.cpp
配置参数解析
#每次回写时间间隔(秒)
SyncInterval=300
#回写频率, 0 表示不限制
SyncSpeed=0
#回写脏数据的线程数
SyncThreadNum=1
#回写时间(秒),即回写多久以前的数据
SyncTime=300
#屏蔽回写时间段(例:0900-1000;1600-1700)
SyncBlockTime=0000-0000
#解除屏蔽回写的脏数据比率
SyncUNBlockPercent=60
SyncInterval | 扫描Dirty链表的时间间隔,由此参数控制数据同步的频率,单位秒 |
SyncSpeed |
每次 瞬时同步的最大记录数;如果超过该值,则休眠10秒再进行;配置为0时无此限制; |
SyncThreadNum |
同步线程数。 |
SyncTime | 回写多久以前的数据。如在00:00:00时将数据set进内存,那么SyncTime秒后才会将数据写入后端db中。 |
SyncBlockTime |
在这个时间段内,不写后端库。 多用于业务高峰期、后端db压力大时,需要屏蔽。 |
SyncUNBlockPercent | 在SyncBlockTime时间段内,如果内存中的脏数据大于这个比例时,SyncBlockTime失效,会立即执行回写操作。 |
DCache给我们提供了丰富配置能力,使用起来非常灵活,可以结合业务压力情况进行设置。
例如,某个业务表的“写”压力很大,而且后端系统还需要从db中读到实时数据,那么就可以如下设置:
#每秒扫描一次脏数据链表
SyncInterval=1
#不限制每次同步的数据量
SyncSpeed=0
#数据量太大,1个线程处理不过来,需要10个甚至更多线程并行处理
SyncThreadNum=10
#内存数据最多1秒就落库了(这是因为SyncInterval=1)
SyncTime=0
这样配置,可以保证1秒之前写入内存的数据在与表数据是一致的,满足业务要求。
注意:
SyncInterval不要设置太小,SyncThreadNum不要设置过大,否则后端db可能吃不消!不要过分追求实时性、一致性,满足业务要求即可。
线程处理逻辑
先看SyncThread源码的入口函数 run:
void* SyncThread::Run(void* arg)
{pthread_detach(pthread_self());SyncThread* pthis = (SyncThread*)arg;pthis->setRuning(true);CachePrx pMasterCachePrx;string sBakSourceAddr = "";time_t tLastDb = 0;while (pthis->isStart()){try{TC_ThreadPool twpool;twpool.init(pthis->getThreadNum());twpool.start();
// TC_Functor<void, TL::TLMaker<time_t>::Result> cmd(pthis, &SyncThread::syncData);time_t tNow = TC_TimeProvider::getInstance()->getNow();if (tNow - tLastDb >= pthis->_syncDbInterval) //这里是主循环,每_syncDbInterval秒执行一次{if (g_app.gstat()->serverType() == MASTER) //只有主节点才能回写数据库{pthis->sync(); //将脏数据尾指针赋值给回写数据尾指针
// TC_Functor<void, TL::TLMaker<time_t>::Result>::wrapper_type fw(cmd, tNow);for (size_t i = 0; i < twpool.getThreadNum(); i++) //启动SyncThreadNum个线程同时处理 {//核心处理逻辑在 SyncThread::syncData 中twpool.exec(std::bind(&SyncThread::syncData, pthis, tNow));}twpool.waitForAllDone();pthis->_syncTime = tNow;TLOGDEBUG("SyncThread::Run, master sync data, t= " << TC_Common::tm2str(pthis->_syncTime) << endl);}else if (g_app.gstat()->serverType() == SLAVE){string sTmpCacheAddr = pthis->geBakSourceAddr();if (sTmpCacheAddr.length() > 0){if (sTmpCacheAddr != sBakSourceAddr){TLOGDEBUG("MasterCacheAddr changed from " << sBakSourceAddr << " to " << sTmpCacheAddr << endl);sBakSourceAddr = sTmpCacheAddr;pMasterCachePrx = Application::getCommunicator()->stringToProxy<CachePrx>(sBakSourceAddr);}time_t tSync = pMasterCachePrx->getSyncTime(); //获取Master节点的SyncTime,即上一次的回写时间pthis->_syncTime = tSync; //保存到Slave节点的内存中if (tSync > 60){time_t tSyncSlave = tSync - 60;pthis->sync();
// TC_Functor<void, TL::TLMaker<time_t>::Result>::wrapper_type fw(cmd, tSyncSlave);for (size_t i = 0; i < twpool.getThreadNum(); i++){twpool.exec(std::bind(&SyncThread::syncData, pthis, tSyncSlave));}twpool.waitForAllDone();}TLOGDEBUG("slave sync data, t= " << TC_Common::tm2str(tSync) << " - 60" << endl);}}tLastDb = tNow;}sleep(1);}catch (const TarsException & ex){TLOGERROR("SyncThread::Run: exception: " << ex.what() << endl);usleep(100000);}catch (const std::exception &ex){TLOGERROR("SyncThread::Run: exception: " << ex.what() << endl);usleep(100000);}catch (...){TLOGERROR("SyncThread::Run: unkown exception: " << endl);usleep(100000);}}pthis->setRuning(false);pthis->setStart(false);return NULL;
}
关键的代码我已经添加了注释。可以看到,run干了2件事:
- Master节点校准同步指针、执行回写 SyncThread::syncData;
- Slave节点同步了回写时间到本地(用于判断是否可以主备切换,后续会介绍);看代码逻辑,Slave好像也执行了回写,其实不是的,在底层函数 CacheStringToDoFunctor::sync 中有判断,只有Master会执行回写操作
再来分析下 SyncThread::syncData
void SyncThread::syncData(time_t t)
{time_t tBegin = TC_TimeProvider::getInstance()->getNow();CanSync& canSync = g_app.gstat()->getCanSync();while (isStart()){int iRet;time_t tNow = TC_TimeProvider::getInstance()->getNow();//今天凌晨开始的秒数time_t nows = (tNow + 28800) % 86400;//检查是否屏蔽回写if (_blockTime.size() > 0){vector<pair<time_t, time_t> >::iterator it = _blockTime.begin();while (it != _blockTime.end()){if (it->first <= nows && nows <= it->second){TLOGDEBUG("[SyncThread::syncData] block sync data! " << nows << endl);sleep(30);break;}it++;}if (it != _blockTime.end())continue;}// 这里判断 回写速率,如果超过了配置的SyncSpeed,就休眠10sif (_syncSpeed > 0 && tBegin == tNow && _syncCount > _syncSpeed){usleep(10000);}else{if (tBegin < tNow){_syncCount = 0;tBegin = tNow;}// 底层回写,CacheServer对应的函数是 CacheStringToDoFunctor::synciRet = g_sHashMap.syncOnce(t, canSync);if (iRet == TC_HashMapMalloc::RT_OK){break;}else if (iRet == TC_HashMapMalloc::RT_NEED_SYNC){_syncCount++;}else if (iRet != TC_HashMapMalloc::RT_NONEED_SYNC && iRet != TC_HashMapMalloc::RT_ONLY_KEY){TLOGERROR("SyncThread::syncData sync data error:" << iRet << endl);g_app.ppReport(PPReport::SRP_CACHE_ERR, 1);break;}}}if (!isStart()){TLOGDEBUG("SyncThread by stop" << endl);}else{TLOGDEBUG("syncData finish" << endl);}
}
- 这个函数先判断当前时间是否在屏蔽时间(SyncBlockTime)范围内,如果不在这个范围才会进行回写;
- 再判断回写速率,是否超出SyncSpeed的限制;
- 上述条件都满足时,才会调用g_sHashMap.syncOnce进行回写处理,对应的底层处理函数是CacheStringToDoFunctor::sync
说明:
SyncUNBlockPercent只在MKVCacheServer(二级索引)中会参与判断。KVCacheServer中不考虑比率的事情。这可能是个bug。
接下来我们再来看一下核心的回写函数CacheStringToDoFunctor::sync
void CacheStringToDoFunctor::sync(const CacheStringToDoFunctor::DataRecord &data)
{{TC_ThreadLock::Lock lock(_lock);_syncKeys.insert(data._key);++_syncCnt;}try {if (g_route_table.isTransfering(data._key)){//这里是数据迁移时的处理逻辑,不在这里讨论。直接看else if的正常处理流程}else if (g_app.gstat()->serverType() == MASTER && _hasDb && g_route_table.isMySelf(data._key)) {//到这里可以发现,只有MASTER会处理回写请求;前提是这个数据是自己分片的(isMySelf),而且后端开启了dbint iRet = 0;bool bEx = false;try{iRet = setDb(data); // 在这里调用了对应的dbacess服务,set数据到后端数据库if (iRet != eDbSucc){TLOGERROR("CacheStringToDoFunctor::sync error, ret = " << iRet << ", key = " << data._key << ", setDb again" << endl);iRet = setDb(data);if (iRet != eDbSucc){TLOGERROR("CacheStringToDoFunctor::sync error, ret = " << iRet << ", key = " << data._key << endl);FDLOG(_dbDayLog) << "set|" << data._key << "|Err|" << iRet << endl;g_app.ppReport(PPReport::SRP_DB_ERR, 1);}else{FDLOG(_dbDayLog) << "set|" << data._key << "|Succ|" << iRet << endl;}}else{FDLOG(_dbDayLog) << "set|" << data._key << "|Succ|" << iRet << endl;}}catch (const TarsException & ex){//这里的代码省略了}}}catch (exception& e){TLOGERROR("CacheStringToDoFunctor::sync exception: " << e.what() << endl);}catch (...){TLOGERROR("CacheStringToDoFunctor::sync unknown exception" << endl);}TC_ThreadLock::Lock lock(_lock);_syncKeys.erase(data._key);
}
核心代码已添加注释。
DCache在回写脏数据时考虑的也比较全面,当DCache在进行数据迁移的时候有独立的处理逻辑,这里我们不讨论,只看正常处理逻辑。大家要牢记重点:
- 回写的前提是有对接后端db;
- 而且存在分片的情况下每个节点只能回写属于自己分片的数据、保证不冲突;
- 只有Master会进行回写
总结
DCache自动持久化到DB的能力,为我们技术架构的构建提供了无限的想象力。在大多数场景下我们的应用可以只对接DCache,同步数据库的操作交给DCache来完成。相较于Redis的双写操作,这大幅提升了效率,也避免了数据一致性、失败回滚等等问题。
DCache-CacheServer分析(六)相关推荐
- 【Java 虚拟机原理】Class 字节码二进制文件分析 六 ( 属性类型 | Code 属性 | 属性名称索引 | 属性长度 | 操作数栈最大深度 | 局部变量存储空间 | 字节码长度 )
文章目录 前言 一.属性类型 二.Code 属性表数据结构 三.属性名称索引 四.属性长度 五.操作数栈最大深度 六.局部变量存储空间 七.字节码长度 八.存储字节码指令的一系列字节流 前言 上一篇博 ...
- 【生产者分析六】Kafka生产者Tips
1.一个batch什么条件下可以发送出去 上面我们介绍了Sender线程发送那个消息的大概流程,接下来我们来分析一下一个batch的数据在什么情况下会发送出去? 回顾发送消息的时候,生产者需要指定的相 ...
- Tomcat启动过程源码分析六
前言 上一篇文章中我们讨论了Catalina类中start方法中一部分,今天这篇文章我们把Catalina类的start方法剩余部分讲解完毕,在讲解代码之前我们先看之前的一篇关于ShutdownHoo ...
- 谷歌chrome浏览器的源码分析(六)
消息的流通过程,是一个不同类相互交流的过程,如果不了解这个过程,根本就不知道这些类是怎么样相互协作的.由于上一次说到ViewHostMsg_RequestResource消息已经发送出来,它的处理过徎 ...
- Android Telephony分析(六) ---- 接口扩展(实践篇)
本文将结合前面五篇文章所讲解的知识,综合起来,实现一个接口扩展的功能. 如果还没有阅读过前面五篇文章的内容,请先阅读: <Android Telephony分析(一) - Phone详解 & ...
- 【转】ABP源码分析六:依赖注入的实现
ABP的依赖注入的实现有一个本质两个途径:1.本质上是依赖于Castle这个老牌依赖注入的框架.2.一种实现途径是通过实现IConventionalDependencyRegistrar的实例定义注入 ...
- Linux 网桥代码分析 六 网桥数据转发函数分析
对于数据包转发函数,主要是分为两大类:数据转发到指定端口.数据扩散到所有端口. 下面就从这两方面进行分析: 一 数据转发到指定端口 对于数据转发到指定端口的功能,也可以分为两个方面:对入口流量进行的 ...
- STA分析(六) cross talk and noise
在深亚微米技术(deep submicron)中,关于crosstalk和noise对design的signal integrate的影响越来越大.主要表现在glitch和对delay的影响. 1)m ...
- 网狐框架分析六--整体框架 20150623
--- # 1.首先得分析网狐整套流程(原版) 客户端和服务器的交互: 客户端登录过程(socket连接,服务器接收,数据库判断,返回验证消息,建立连接) 客户 ...
- motan源码分析六:客户端与服务器的通信层分析
本章将分析motan的序列化和底层通信相关部分的代码. 1.在上一章中,有一个getrefers的操作,来获取所有服务器的引用,每个服务器的引用都是由DefaultRpcReferer来创建的 pub ...
最新文章
- 使用nsenter进入docker容器后端报错 mesg: ttyname failed: No such file or directory
- linux集群搭建coolrainbow,Rainbow°110408_教程▍KBFS听歌学韩语—So Cool[Rainbow]
- 真诚推荐几个最值得关注的前端公众号
- 全面解析resultType和resultMap的区别
- 【杂文】spring-boot报错 ~ zone value ‘Öйú±ê׼ʱ¼ä‘ is unrecognized or represents more than on time zone.
- oracle 12g 无监听,Oracle 12.2监听无法启动解决一例
- 性能测试--jmeter中XPath断言【10】
- Android平台的通话计时源码
- SharePoint 2010-在ribbon上添加表单,将默认control加到自定义group中
- 本地域名转向Hosts文件位置
- IDEA主题配置--- 炫酷的主题字体颜色设置(基于Intellij IDEA 2018)
- 腾讯云 直播 OBS 在线推流
- 百度APP“看听模式”:“AI主播”借道信息流全面落地?
- PASCAL VOC DATASET
- 十分钟带你做一个学生管理系统
- 胡适说:他拍的是真正如画的北京
- ASP.NET图片添加水印
- 【Pytorch】带注释的Transformer (各个部件的实现及应用实例)
- 云栖回顾|龙蜥社区有哪些值得回味的精彩瞬间?
- nginx 配置格式化工具