在paxos算法中,主要包含了三个角色,proposer、accepter和learner。learner在paxos算法的论文中提及不是很详细,但是在phxpaxos实现是最为复杂的,之后独立一篇文章分析。
在微信的文档中,通过推导,一步一步得出simple-paxos正式算法的流程,推导的过程可以反复看看原先的文档(Paxos理论介绍(1): 朴素Paxos算法理论推导与证明),理解透了收益非浅。
这里主要分析phxpaxos的proposer和Acceptor代码实现算法的过程。
先看看算法的主要过程,给出的一页PPT如下:
    在phxpaxos的代码中,整理出主要流程如下图,
    在图中,左边是propoer的流程,右边是accepter的流程。其实就是最简单的方式实现了文档中paxos算法的流程。这里按照代码的主要流程来进行一步步分析。
首先,Propose的处理:
int Proposer :: NewValue(const std::string & sValue)
{
BP->GetProposerBP()->NewProposal(sValue);
if (m_oProposerState.GetValue().size() == 0)
{
m_oProposerState.SetValue(sValue);
}
m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;
if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
{
//本节点之前已经执行过Prepare阶段,并且Prepare阶段或者Accept阶段没有被人拒绝过。。
BP->GetProposerBP()->NewProposalSkipPrepare();
PLGHead("skip prepare, directly start accept");
Accept();
}
else
{
//这里被人拒绝过就增加proposalID,否则,沿用之前的proposalID
//if not reject by someone, no need to increase ballot
Prepare(m_bWasRejectBySomeone);
}
return 0;
}
Propose的代码就是一些初始化并调用Prepare,这里有一个Multi-paxos的处理,就是有选择性的跳过Prepare,当前的proposer已经进行过了提交,并且在Prepare阶段或者Accept阶段没有被拒绝过,则跳过prepare阶段,具体的推导过程可以见(Paxos理论介绍(2): Multi-Paxos与Leader)。
接着进入Prepare,代码如下:
void Proposer :: Prepare(const bool bNeedNewBallot)
{
PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
m_oProposerState.GetValue().size());
BP->GetProposerBP()->Prepare();
m_oTimeStat.Point();
//重置proposer的状态,退出Accept状态,进入Prepare状态
ExitAccept();
m_bIsPreparing = true;
m_bCanSkipPrepare = false;
m_bWasRejectBySomeone = false;
//是否需要重新分配ballot,被人拒绝过就需要重新分配
m_oProposerState.ResetHighestOtherPreAcceptBallot();
if (bNeedNewBallot)
{
m_oProposerState.NewPrepare();
}
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
m_oMsgCounter.StartNewRound();
//设置Prepare超时定时器
AddPrepareTimer();
PLGHead("END OK");
//发送Prepare消息。BroadcastMessage默认采用UDP方式发送,自己先执行,再发送给其他的节点
BroadcastMessage(oPaxosMsg);
}
Prepare主要判断是否需要重新分配ballot,并且增加一个Prepare定时器,超时重新进行Prepare。这里有一行代码:
m_oMsgCounter.StartNewRound();
m_oMsgCounter主要负责统计是否通过投票,每次prepare或者accept都会清空,表示新一轮投票。
接下来我们看看Accepter通过OnPrepare函数来处理Prepare的消息的代码:
int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());
BP->GetAcceptorBP()->OnPrepare();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
//Ballot大于承诺的ballot, 则触发Promise操作
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//设置承认的ballot
oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
{
oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
}
//设置promise ballot 的值
m_oAcceptorState.SetPromiseBallot(oBallot);
//持久化
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnPreparePersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return -1;
}
BP->GetAcceptorBP()->OnPreparePass();
}
else
{
//Reject,并且告诉proposer拒绝他的promiseID的值
BP->GetAcceptorBP()->OnPrepareReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());;
//通过UDP回消息
SendMessage(iReplyNodeID, oReplyPaxosMsg);
return 0;
}
可以对比看看算法的过程,是不是和代码的流程几乎一样,只是一个对PromiseID的判断,如果大于等于PromiseID,则Promise,否则Reject。
    接下来,Proposer收到每个Accepter发回的OnPrepareReply代码如下:
void Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
BP->GetProposerBP()->OnPrepareReply();
if (!m_bIsPreparing)
{
//不处于Preparing状态,直接退出
BP->GetProposerBP()->OnPrepareReplyButNotPreparing();
//PLGErr("Not preparing, skip this msg");
return;
}
if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
{
//proposalID不匹配,也直接退出
BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();
//PLGErr("ProposalID not same, skip this msg");
return;
}
//消息计数器增加收到消息的节点
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{
//Promise,消息计数器中增加PromiseOrAccept的节点,
//并且将该节点的promise id、提案值更新到本地
BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu",
oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
}
else
{
//Reject,消息计数器中增加Reject的节点,将m_bWasRejectBySomeone置为true,表示proposalID需要增加
//增加的大小在拒绝的accepter的promiseID的大小和本机的ProposalID的最大值加1
PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeone = true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
if (m_oMsgCounter.IsPassedOnThisRound())
{
//通过了之后,将m_bCanSkipPrepare置为true,表面可以跳过prepare状态。启动Accept流程
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->PreparePass(iUseTimeMs);
PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
m_bCanSkipPrepare = true;
Accept();
}
else if (m_oMsgCounter.IsRejectedOnThisRound()
|| m_oMsgCounter.IsAllReceiveOnThisRound())
{
//拒绝了之后重置prepare定时器
BP->GetProposerBP()->PrepareNotPass();
PLGImp("[Not Pass] wait 30ms and restart prepare");
AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
}
PLGHead("END");
}
这里看看AddPreAcceptValue函数:
void ProposerState :: AddPreAcceptValue(
const BallotNumber & oOtherPreAcceptBallot,
const std::string & sOtherPreAcceptValue)
{
PLGDebug("OtherPreAcceptID %lu OtherPreAcceptNodeID %lu HighestOtherPreAcceptID %lu "
"HighestOtherPreAcceptNodeID %lu OtherPreAcceptValue %zu",
oOtherPreAcceptBallot.m_llProposalID, oOtherPreAcceptBallot.m_llNodeID,
m_oHighestOtherPreAcceptBallot.m_llProposalID, m_oHighestOtherPreAcceptBallot.m_llNodeID,
sOtherPreAcceptValue.size());
if (oOtherPreAcceptBallot.isnull())
{
//如果av为空。直接返回
return;
}
if (oOtherPreAcceptBallot > m_oHighestOtherPreAcceptBallot)
{
//如果ab大于maxb,则保存maxb和value
m_oHighestOtherPreAcceptBallot = oOtherPreAcceptBallot;
m_sValue = sOtherPreAcceptValue;
}
}
在simple paxos论文对第一阶段Accept描述如下:
If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered proposal (if any) that it has accepted.
如果ab>maxb且av不等于null,将maxb赋值为ab并且将value赋值为av,即上文说的,被accept的最高提案值。然后当多数派promise之后就进行accept。
Accept的代码如下:
void Proposer :: Accept()
{
PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu",
m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());
BP->GetProposerBP()->Accept();
m_oTimeStat.Point();
//退出Prepare状态,进入Accept状态
ExitPrepare();
m_bIsAccepting = true;
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
oPaxosMsg.set_value(m_oProposerState.GetValue());
oPaxosMsg.set_lastchecksum(GetLastChecksum());
m_oMsgCounter.StartNewRound();
//增加Accept定时器
AddAcceptTimer();
PLGHead("END");
//Udp广播Accept消息,自己最后执行
BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}
    
    基本流程和prepare差不多,就不多说了,看看accepter接受到accept的消息的OnAccept的代码:
void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());
BP->GetAcceptorBP()->OnAccept();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
//Ballot大于承诺的ballot, 则触发Accept的操作
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//设置pb, ab和av
m_oAcceptorState.SetPromiseBallot(oBallot);
m_oAcceptorState.SetAcceptedBallot(oBallot);
m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
//持久化
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnAcceptPersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return;
}
BP->GetAcceptorBP()->OnAcceptPass();
}
else
{
//Reject,并且告诉proposer拒绝他的promiseID的值
BP->GetAcceptorBP()->OnAcceptReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());
//通过UDP回消息
SendMessage(iReplyNodeID, oReplyPaxosMsg);
}
这里也没有进行特殊的处理。在accept的处理中,如果接受了,则不能改变(当然了,如果遇到同一个instance的另一个ballot更高的提案接受,则更新为proposeid更高的提案),需要将提案值持久化,通过leveldb保存至本地数据库。
接着看看OnAcceptReplay 的代码如下:
void Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
BP->GetProposerBP()->OnAcceptReply();
if (!m_bIsAccepting)
{
//不处于Accept状态,直接退出
//PLGErr("Not proposing, skip this msg");
BP->GetProposerBP()->OnAcceptReplyButNotAccepting();
return;
}
if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
{
//proposalID不匹配,也直接退出
//PLGErr("ProposalID not same, skip this msg");
BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();
return;
}
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{
//Accept,消息计数器中增加PromiseOrAccept的节点(m_oMsgCounter在每一轮Prepare或者Accept都会重置)
PLGDebug("[Accept]");
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
}
else
{
//Reject,消息计数器中增加Reject的节点,将m_bWasRejectBySomeone置为true,表示proposalID需要增加
//增加的大小在拒绝的accepter的promiseID的大小和本机的ProposalID的最大值加1
PLGDebug("[Reject]");
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeone = true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
if (m_oMsgCounter.IsPassedOnThisRound())
{
//多数派同意了提案,退出Accept状态,发送实例中提案被chosen的消息
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->AcceptPass(iUseTimeMs);
PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
ExitAccept();
m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
}
else if (m_oMsgCounter.IsRejectedOnThisRound()
|| m_oMsgCounter.IsAllReceiveOnThisRound())
{
//失败,重置Accept定时器,Accept重新从Prepare开始,如果instance增加了则忽略提案
BP->GetProposerBP()->AcceptNotPass();
PLGImp("[Not pass] wait 30ms and Restart prepare");
AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
}
PLGHead("END");
}
        
    在OnAcceptReply和OnPrepareReply的消息中,如果遇到不在预期状态的回复时,直接忽略。

phxpaxos的Proposer和Acceptor的流程相关推荐

  1. 通过 PhxPaxos 了解 Paxos 原理

    通过 PhxPaxos 了解 Paxos 原理 Prepare阶段 Prepare // src/algorithm/proposer.cppvoid Proposer :: Prepare(cons ...

  2. PhxPaxos源码分析之(3)提案发起篇(Paxos协议核心)

    更多 blog 见: https://joeylichang.github.io/ 本篇内容根据Paxos协议分五部分介绍,即发起Prpare请求.给Prepare请求投票.收集Prepare投票,接 ...

  3. Paxos和Raft的前世今生

    前言 在保证数据安全的基础上,保持服务的持续可用,是核心业务对底层数据存储系统的基本要求.业界常见的1主N备的方案面临的问题是"最大可用(Maximum Availability)" ...

  4. 跟着微信后台团队学习分布式一致性协议

    目录 什么是Paxos 一致性协议 分布式环境 提议者 Paxos是用来干什么的 确定一个值 确定多个值 有序的确定多个值 实例的对齐(Learn) 如何应用Paxos 状态机 工程化 我们需要多个角 ...

  5. 区块链共识机制:分布式系统的Paxos协议

    前言:第一次接触paxos可能很多人不理解这玩意儿有啥用,近几天一直在研究paxos,不敢说理解的多到位,但是把自己理解的记录下来,供大家参考.文章主要参考知行学社的<分布式系统与Paxos算法 ...

  6. 如何实现一个 Paxos

    Paxos 作为一个经典的分布式一致性算法(Consensus Algorithm),在各种教材中也被当做范例来讲解.但由于其抽象性,很少有人基于朴素 Paxos 开发一致性库,而 RAFT 则是工业 ...

  7. 深度介绍分布式系统原理与设计

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 1 概念 1.1 模型 1.2 副本 1.3 衡量分布式系 ...

  8. 分布式概念-分布式事务,并发处理协议

    点击上方蓝色字体,选择"设为星标" 优质文章,及时送达 提到分布式系统,分布式事务是经常被大家提起的话题,也是经常在我们编码或是系统设计时遇到的问题,很常见. 如果让大家说一种解决 ...

  9. 分布式一致性协议paxos

    Paxos协议/算法是分布式系统中比较重要的协议,它有多重要呢? <分布式系统的事务处理>: Google Chubby的作者Mike Burrows说过这个世界上只有一种一致性算法,那就 ...

最新文章

  1. Windows7 64bit VS2013 Caffe test MNIST操作步骤
  2. 中山大学2016年硕士研究生入学考试复试基本分数线
  3. python计算平方用map函数_python的map函数的使用方法详解以及使用案例(处理每个元素的自增、自减、平方等)...
  4. 简单约瑟夫环问题解法汇总(模拟/数论)
  5. 第七节:WebApi与Unity整合进行依赖注入和AOP的实现
  6. 饭卡可以用水冲洗吗_薄壁不锈钢水管真的可以满足大众用水健康管道的要求吗?...
  7. python批量下载文件教程_Python抓包菜鸟教程:批量下载图片的方法,电脑和手机都能用...
  8. Oracle——集合运算
  9. java中main函数的值是_java基础-main方法
  10. HTML5中Audio使用踩坑汇总
  11. 过拟合的含义、出现原因及解决方案
  12. 干货 | E-Prime实验数据处理之E-Merge妙招,确定不来看吗?
  13. cisco 思科三层交换机配置命令
  14. 林淮川孙玄:分布式锁选型背后的架构设计思维【附源码】
  15. ECharts中Y轴坐标上标记有实心圆
  16. linux系统键盘关机快捷键,Linux三种关机/重启系统的命令
  17. SCS【2】单细胞转录组 之 cellranger
  18. VR医疗平台Osso VR完成6600万美元C轮融资
  19. 全球人工智能工程师 冬令营火热招生ing
  20. Photoshop CS3专家讲堂视频教程(10月21日更新到106课)

热门文章

  1. 采用_beginthread/_beginthreadex函数创建多线程
  2. 随机点菜demo(纯HTML、CSS、JS)
  3. 深入理解UE4宏定义—— GENERATED_BODY
  4. Android开发最新所有框架总结
  5. 央行提醒:远离“征信修复”骗局
  6. WordPress编辑器支持Word文档导入
  7. linux 微代码下载,英特尔放出Linux微代码以修复Meltdown和Spectre漏洞
  8. 为什么要找正规的量化交易平台开户?
  9. python-豆瓣电影 Top 250,电影名称,年份,评论人数,评分
  10. 关于Python的mock