DCache支持定期将内存“脏”数据自动写入后端数据库,而且不需要编写任何一行代码。

本文介绍CacheServer回写线程(SyncThread)的功能以及处理流程。

SyncThread

功能简介

定时回写脏数据到后端db,同步“回写时间”到备机。

这是DCache的亮点功能。DChace会将未持久化的数据(注意!这里指的是未持久化到后端db)的Key的地址插入“Dirty链表”,由该线程轮询处理,将脏数据传送给对应的DbAcess服务写后端db;写入成功后,将该数据从Dirty链中删除。

这个功能只需要进行几步简单的配置就可实现,不需要开发任何代码。

相关文件:

SyncThread.h

SyncThread.cpp

配置参数解析

#每次回写时间间隔(秒)

SyncInterval=300

#回写频率, 0 表示不限制

SyncSpeed=0

#回写脏数据的线程数

SyncThreadNum=1

#回写时间(秒),即回写多久以前的数据

SyncTime=300

#屏蔽回写时间段(例:0900-1000;1600-1700)

SyncBlockTime=0000-0000

#解除屏蔽回写的脏数据比率

SyncUNBlockPercent=60

SyncInterval 扫描Dirty链表的时间间隔,由此参数控制数据同步的频率,单位秒
SyncSpeed

每次 瞬时同步的最大记录数;如果超过该值,则休眠10秒再进行;配置为0时无此限制;

SyncThreadNum

同步线程数。

SyncTime 回写多久以前的数据。如在00:00:00时将数据set进内存,那么SyncTime秒后才会将数据写入后端db中。
SyncBlockTime

在这个时间段内,不写后端库。

多用于业务高峰期、后端db压力大时,需要屏蔽。

SyncUNBlockPercent 在SyncBlockTime时间段内,如果内存中的脏数据大于这个比例时,SyncBlockTime失效,会立即执行回写操作。

DCache给我们提供了丰富配置能力,使用起来非常灵活,可以结合业务压力情况进行设置。

例如,某个业务表的“写”压力很大,而且后端系统还需要从db中读到实时数据,那么就可以如下设置:

#每秒扫描一次脏数据链表

SyncInterval=1

#不限制每次同步的数据量

SyncSpeed=0

#数据量太大,1个线程处理不过来,需要10个甚至更多线程并行处理

SyncThreadNum=10

#内存数据最多1秒就落库了(这是因为SyncInterval=1)

SyncTime=0

这样配置,可以保证1秒之前写入内存的数据在与表数据是一致的,满足业务要求。

注意:

SyncInterval不要设置太小,SyncThreadNum不要设置过大,否则后端db可能吃不消!不要过分追求实时性、一致性,满足业务要求即可。

线程处理逻辑

先看SyncThread源码的入口函数 run:

void* SyncThread::Run(void* arg)
{pthread_detach(pthread_self());SyncThread* pthis = (SyncThread*)arg;pthis->setRuning(true);CachePrx pMasterCachePrx;string sBakSourceAddr = "";time_t tLastDb = 0;while (pthis->isStart()){try{TC_ThreadPool twpool;twpool.init(pthis->getThreadNum());twpool.start();
//            TC_Functor<void, TL::TLMaker<time_t>::Result> cmd(pthis, &SyncThread::syncData);time_t tNow = TC_TimeProvider::getInstance()->getNow();if (tNow - tLastDb >= pthis->_syncDbInterval)  //这里是主循环,每_syncDbInterval秒执行一次{if (g_app.gstat()->serverType() == MASTER) //只有主节点才能回写数据库{pthis->sync();  //将脏数据尾指针赋值给回写数据尾指针
//                    TC_Functor<void, TL::TLMaker<time_t>::Result>::wrapper_type fw(cmd, tNow);for (size_t i = 0; i < twpool.getThreadNum(); i++) //启动SyncThreadNum个线程同时处理 {//核心处理逻辑在 SyncThread::syncData 中twpool.exec(std::bind(&SyncThread::syncData, pthis, tNow));}twpool.waitForAllDone();pthis->_syncTime = tNow;TLOGDEBUG("SyncThread::Run, master sync data, t= " << TC_Common::tm2str(pthis->_syncTime) << endl);}else if (g_app.gstat()->serverType() == SLAVE){string sTmpCacheAddr = pthis->geBakSourceAddr();if (sTmpCacheAddr.length() > 0){if (sTmpCacheAddr != sBakSourceAddr){TLOGDEBUG("MasterCacheAddr changed from " << sBakSourceAddr << " to " << sTmpCacheAddr << endl);sBakSourceAddr = sTmpCacheAddr;pMasterCachePrx = Application::getCommunicator()->stringToProxy<CachePrx>(sBakSourceAddr);}time_t tSync = pMasterCachePrx->getSyncTime(); //获取Master节点的SyncTime,即上一次的回写时间pthis->_syncTime = tSync; //保存到Slave节点的内存中if (tSync > 60){time_t tSyncSlave = tSync - 60;pthis->sync();
//                            TC_Functor<void, TL::TLMaker<time_t>::Result>::wrapper_type fw(cmd, tSyncSlave);for (size_t i = 0; i < twpool.getThreadNum(); i++){twpool.exec(std::bind(&SyncThread::syncData, pthis, tSyncSlave));}twpool.waitForAllDone();}TLOGDEBUG("slave sync data, t= " << TC_Common::tm2str(tSync) << " - 60" << endl);}}tLastDb = tNow;}sleep(1);}catch (const TarsException & ex){TLOGERROR("SyncThread::Run: exception: " << ex.what() << endl);usleep(100000);}catch (const std::exception &ex){TLOGERROR("SyncThread::Run: exception: " << ex.what() << endl);usleep(100000);}catch (...){TLOGERROR("SyncThread::Run: unkown exception: " << endl);usleep(100000);}}pthis->setRuning(false);pthis->setStart(false);return NULL;
}

关键的代码我已经添加了注释。可以看到,run干了2件事:

  1. Master节点校准同步指针、执行回写 SyncThread::syncData;
  2. Slave节点同步了回写时间到本地(用于判断是否可以主备切换,后续会介绍);看代码逻辑,Slave好像也执行了回写,其实不是的,在底层函数 CacheStringToDoFunctor::sync 中有判断,只有Master会执行回写操作

再来分析下 SyncThread::syncData

void SyncThread::syncData(time_t t)
{time_t tBegin = TC_TimeProvider::getInstance()->getNow();CanSync& canSync = g_app.gstat()->getCanSync();while (isStart()){int iRet;time_t tNow = TC_TimeProvider::getInstance()->getNow();//今天凌晨开始的秒数time_t nows = (tNow + 28800) % 86400;//检查是否屏蔽回写if (_blockTime.size() > 0){vector<pair<time_t, time_t> >::iterator it = _blockTime.begin();while (it != _blockTime.end()){if (it->first <= nows && nows <= it->second){TLOGDEBUG("[SyncThread::syncData] block sync data! " << nows << endl);sleep(30);break;}it++;}if (it != _blockTime.end())continue;}// 这里判断 回写速率,如果超过了配置的SyncSpeed,就休眠10sif (_syncSpeed > 0 && tBegin == tNow && _syncCount > _syncSpeed){usleep(10000);}else{if (tBegin < tNow){_syncCount = 0;tBegin = tNow;}// 底层回写,CacheServer对应的函数是 CacheStringToDoFunctor::synciRet = g_sHashMap.syncOnce(t, canSync);if (iRet == TC_HashMapMalloc::RT_OK){break;}else if (iRet == TC_HashMapMalloc::RT_NEED_SYNC){_syncCount++;}else if (iRet != TC_HashMapMalloc::RT_NONEED_SYNC && iRet != TC_HashMapMalloc::RT_ONLY_KEY){TLOGERROR("SyncThread::syncData sync data error:" << iRet << endl);g_app.ppReport(PPReport::SRP_CACHE_ERR, 1);break;}}}if (!isStart()){TLOGDEBUG("SyncThread by stop" << endl);}else{TLOGDEBUG("syncData finish" << endl);}
}
  1. 这个函数先判断当前时间是否在屏蔽时间(SyncBlockTime)范围内,如果不在这个范围才会进行回写;
  2. 再判断回写速率,是否超出SyncSpeed的限制;
  3. 上述条件都满足时,才会调用g_sHashMap.syncOnce进行回写处理,对应的底层处理函数是CacheStringToDoFunctor::sync

说明:

SyncUNBlockPercent只在MKVCacheServer(二级索引)中会参与判断。KVCacheServer中不考虑比率的事情。这可能是个bug。

接下来我们再来看一下核心的回写函数CacheStringToDoFunctor::sync

void CacheStringToDoFunctor::sync(const CacheStringToDoFunctor::DataRecord &data)
{{TC_ThreadLock::Lock lock(_lock);_syncKeys.insert(data._key);++_syncCnt;}try {if (g_route_table.isTransfering(data._key)){//这里是数据迁移时的处理逻辑,不在这里讨论。直接看else if的正常处理流程}else if (g_app.gstat()->serverType() == MASTER  && _hasDb && g_route_table.isMySelf(data._key)) {//到这里可以发现,只有MASTER会处理回写请求;前提是这个数据是自己分片的(isMySelf),而且后端开启了dbint iRet = 0;bool bEx = false;try{iRet = setDb(data); // 在这里调用了对应的dbacess服务,set数据到后端数据库if (iRet != eDbSucc){TLOGERROR("CacheStringToDoFunctor::sync error, ret = " << iRet << ", key = " << data._key << ", setDb again" << endl);iRet = setDb(data);if (iRet != eDbSucc){TLOGERROR("CacheStringToDoFunctor::sync error, ret = " << iRet << ", key = " << data._key << endl);FDLOG(_dbDayLog) << "set|" << data._key << "|Err|" << iRet << endl;g_app.ppReport(PPReport::SRP_DB_ERR, 1);}else{FDLOG(_dbDayLog) << "set|" << data._key << "|Succ|" << iRet << endl;}}else{FDLOG(_dbDayLog) << "set|" << data._key << "|Succ|" << iRet << endl;}}catch (const TarsException & ex){//这里的代码省略了}}}catch (exception& e){TLOGERROR("CacheStringToDoFunctor::sync exception: " << e.what() << endl);}catch (...){TLOGERROR("CacheStringToDoFunctor::sync unknown exception" << endl);}TC_ThreadLock::Lock lock(_lock);_syncKeys.erase(data._key);
}

核心代码已添加注释。

DCache在回写脏数据时考虑的也比较全面,当DCache在进行数据迁移的时候有独立的处理逻辑,这里我们不讨论,只看正常处理逻辑。大家要牢记重点:

  1. 回写的前提是有对接后端db;
  2. 而且存在分片的情况下每个节点只能回写属于自己分片的数据、保证不冲突;
  3. 只有Master会进行回写

总结

DCache自动持久化到DB的能力,为我们技术架构的构建提供了无限的想象力。在大多数场景下我们的应用可以只对接DCache,同步数据库的操作交给DCache来完成。相较于Redis的双写操作,这大幅提升了效率,也避免了数据一致性、失败回滚等等问题。

DCache-CacheServer分析(六)相关推荐

  1. 【Java 虚拟机原理】Class 字节码二进制文件分析 六 ( 属性类型 | Code 属性 | 属性名称索引 | 属性长度 | 操作数栈最大深度 | 局部变量存储空间 | 字节码长度 )

    文章目录 前言 一.属性类型 二.Code 属性表数据结构 三.属性名称索引 四.属性长度 五.操作数栈最大深度 六.局部变量存储空间 七.字节码长度 八.存储字节码指令的一系列字节流 前言 上一篇博 ...

  2. 【生产者分析六】Kafka生产者Tips

    1.一个batch什么条件下可以发送出去 上面我们介绍了Sender线程发送那个消息的大概流程,接下来我们来分析一下一个batch的数据在什么情况下会发送出去? 回顾发送消息的时候,生产者需要指定的相 ...

  3. Tomcat启动过程源码分析六

    前言 上一篇文章中我们讨论了Catalina类中start方法中一部分,今天这篇文章我们把Catalina类的start方法剩余部分讲解完毕,在讲解代码之前我们先看之前的一篇关于ShutdownHoo ...

  4. 谷歌chrome浏览器的源码分析(六)

    消息的流通过程,是一个不同类相互交流的过程,如果不了解这个过程,根本就不知道这些类是怎么样相互协作的.由于上一次说到ViewHostMsg_RequestResource消息已经发送出来,它的处理过徎 ...

  5. Android Telephony分析(六) ---- 接口扩展(实践篇)

    本文将结合前面五篇文章所讲解的知识,综合起来,实现一个接口扩展的功能.  如果还没有阅读过前面五篇文章的内容,请先阅读:  <Android Telephony分析(一) - Phone详解 & ...

  6. 【转】ABP源码分析六:依赖注入的实现

    ABP的依赖注入的实现有一个本质两个途径:1.本质上是依赖于Castle这个老牌依赖注入的框架.2.一种实现途径是通过实现IConventionalDependencyRegistrar的实例定义注入 ...

  7. Linux 网桥代码分析 六 网桥数据转发函数分析

    对于数据包转发函数,主要是分为两大类:数据转发到指定端口.数据扩散到所有端口. 下面就从这两方面进行分析: 一  数据转发到指定端口 对于数据转发到指定端口的功能,也可以分为两个方面:对入口流量进行的 ...

  8. STA分析(六) cross talk and noise

    在深亚微米技术(deep submicron)中,关于crosstalk和noise对design的signal integrate的影响越来越大.主要表现在glitch和对delay的影响. 1)m ...

  9. 网狐框架分析六--整体框架 20150623

    --- # 1.首先得分析网狐整套流程(原版)     客户端和服务器的交互:         客户端登录过程(socket连接,服务器接收,数据库判断,返回验证消息,建立连接)         客户 ...

  10. motan源码分析六:客户端与服务器的通信层分析

    本章将分析motan的序列化和底层通信相关部分的代码. 1.在上一章中,有一个getrefers的操作,来获取所有服务器的引用,每个服务器的引用都是由DefaultRpcReferer来创建的 pub ...

最新文章

  1. 使用nsenter进入docker容器后端报错 mesg: ttyname failed: No such file or directory
  2. linux集群搭建coolrainbow,Rainbow°110408_教程▍KBFS听歌学韩语—So Cool[Rainbow]
  3. 真诚推荐几个最值得关注的前端公众号
  4. 全面解析resultType和resultMap的区别
  5. 【杂文】spring-boot报错 ~ zone value ‘Öйú±ê׼ʱ¼ä‘ is unrecognized or represents more than on time zone.
  6. oracle 12g 无监听,Oracle 12.2监听无法启动解决一例
  7. 性能测试--jmeter中XPath断言【10】
  8. Android平台的通话计时源码
  9. SharePoint 2010-在ribbon上添加表单,将默认control加到自定义group中
  10. 本地域名转向Hosts文件位置
  11. IDEA主题配置--- 炫酷的主题字体颜色设置(基于Intellij IDEA 2018)
  12. 腾讯云 直播 OBS 在线推流
  13. 百度APP“看听模式”:“AI主播”借道信息流全面落地?
  14. PASCAL VOC DATASET
  15. 十分钟带你做一个学生管理系统
  16. 胡适说:他拍的是真正如画的北京
  17. ASP.NET图片添加水印
  18. 【Pytorch】带注释的Transformer (各个部件的实现及应用实例)
  19. 云栖回顾|龙蜥社区有哪些值得回味的精彩瞬间?
  20. nginx 配置格式化工具

热门文章

  1. Web前端开发工程师实战培训教程
  2. 购物网站前端页面的制作
  3. wampserver3.1 64下载地址
  4. 交叉编译 FT4232 eeprom 开源工具 libftdi 记录
  5. html扣图标,怎么用PS抠图标?
  6. 记录4399 + 商汤笔试
  7. java 12306高并发抢票_PHP 高并发、抢票、秒杀 解决方案
  8. 控制策略模型建模规范
  9. oppo计算机的夜间模式,OPPO如何设置夜间护眼模式?OPPO手机护眼模式使用教程
  10. 爱奇艺2016在线笔试