通过 PhxPaxos 了解 Paxos 原理

Prepare阶段

Prepare

// src/algorithm/proposer.cppvoid Proposer :: Prepare(const bool bNeedNewBallot)
{PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),m_oProposerState.GetValue().size());BP->GetProposerBP()->Prepare();m_oTimeStat.Point();ExitAccept();//表明Proposer正处于Prepare阶段m_bIsPreparing = true;//不能跳过Prepare阶段m_bCanSkipPrepare = false;//目前还未被任意一个Acceptor拒绝m_bWasRejectBySomeone = false;m_oProposerState.ResetHighestOtherPreAcceptBallot();//如果需要产生新的投票,就调用NewPrepare产生新的ProposalID,新的ProposalID为当前已知的最大ProposalID+1if (bNeedNewBallot){m_oProposerState.NewPrepare();}PaxosMsg oPaxosMsg;//设置Prepare消息的各个字段oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);oPaxosMsg.set_instanceid(GetInstanceID());oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());//MsgCount是专门用来统计票数的,根据计算的结果确定是否通过Prepare阶段或者Accept阶段m_oMsgCounter.StartNewRound();//Prepare超时定时器AddPrepareTimer();PLGHead("END OK");//将Prepare消息发送到各个节点BroadcastMessage(oPaxosMsg);
}

Proposer在Prepare阶段主要做了这么几件事:

  • 重置各个状态位,表明当前正处于Prepare阶段。

  • 获取提案编号ProposalID。当bNeedNewBallot为true时需要将ProposalID+1。否则沿用之前的ProposalID。bNeedNewBallot是在NewValue中调用Prepare方法时传入的m_bWasRejectBySomeone参数。也就是如果之前没有被任何Acceptor拒绝(说明还没有明确出现更大的ProposalID),则不需要获取新的ProposalID。对应的场景是Prepare阶段超时了,在超时时间内没有收到过半Acceptor同意的消息,因此需要重新执行Prepare阶段,此时只需要沿用原来的ProposalID即可。

  • 发送Prepare请求。该请求PaxosMsg是Protocol Buffer定义的一个message,包含MsgType、InstanceID、NodeID、ProposalID等字段。在BroadcastMessage(oPaxosMsg)中还会将oPaxosMsg序列化后才发送出去。

PaxosMsg的定义如下,Prepare和Accept阶段Proposer和Acceptor的所有消息都用PaxosMsg来表示:

// src/comm/paxos_msg.protomessage PaxosMsg
{required int32 MsgType = 1;optional uint64 InstanceID = 2;optional uint64 NodeID = 3;optional uint64 ProposalID = 4;optional uint64 ProposalNodeID = 5;optional bytes Value = 6;optional uint64 PreAcceptID = 7;optional uint64 PreAcceptNodeID = 8;optional uint64 RejectByPromiseID = 9;optional uint64 NowInstanceID = 10;optional uint64 MinChosenInstanceID = 11;optional uint32 LastChecksum = 12;optional uint32 Flag = 13;optional bytes SystemVariables = 14;optional bytes MasterVariables = 15;
};

OnPrepareReply

Proposer发出Prepare请求后就开始等待Acceptor的回复。当Proposer所在节点收到PaxosPrepareReply消息后,就会调用Proposer的OnPrepareReply(oPaxosMsg),其中oPaxosMsg是Acceptor回复的消息。

// src/algorithm/proposer.cppvoid Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg)
{PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());BP->GetProposerBP()->OnPrepareReply();//如果Proposer不是在Prepare阶段,则忽略该消息if (!m_bIsPreparing){BP->GetProposerBP()->OnPrepareReplyButNotPreparing();//PLGErr("Not preparing, skip this msg");return;}//如果ProposalID不同,也忽略if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID()){BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();//PLGErr("ProposalID not same, skip this msg");return;}//加入一个收到的消息,用于MsgCounter统计m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());//如果该消息不是拒绝,即Acceptor同意本次Prepare请求if (oPaxosMsg.rejectbypromiseid() == 0){BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu", oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());//加入MsgCounter用于统计投票m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());//将Acceptor返回的它接受过的编号最大的提案记录下来(如果有的话),用于确定Accept阶段的Valuem_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());}//Acceptor拒绝了Prepare请求else{PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());//同样也要记录到MsgCounter用于统计投票m_oMsgCounter.AddReject(oPaxosMsg.nodeid());//记录被Acceptor拒绝过,待会儿如果重新进入Prepare阶段的话就需要获取更大的ProposalIDm_bWasRejectBySomeone = true;//记录下别的Proposer提出的更大的ProposalID。这样重新发起Prepare请求时才知道需要用多大的ProposalIDm_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());}//本次Prepare请求通过了。也就是得到了半数以上Acceptor的同意if (m_oMsgCounter.IsPassedOnThisRound()){int iUseTimeMs = m_oTimeStat.Point();BP->GetProposerBP()->PreparePass(iUseTimeMs);PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);m_bCanSkipPrepare = true;//进入Accept阶段Accept();}//本次Prepare请求没有通过else if (m_oMsgCounter.IsRejectedOnThisRound()|| m_oMsgCounter.IsAllReceiveOnThisRound()){BP->GetProposerBP()->PrepareNotPass();PLGImp("[Not Pass] wait 30ms and restart prepare");//随机等待一段时间后重新发起Prepare请求AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);}PLGHead("END");
}

该阶段Proposer主要做了以下事情:

  • 判断消息是否有效。包括ProposalID是否相同,自身是否处于Prepare阶段等。因为网络是不可靠的,有些消息可能延迟很久,等收到的时候已经不需要了,所以需要做这些判断。

  • 将收到的消息加入MsgCounter用于统计。

  • 根据收到的消息更新自身状态。包括Acceptor承诺过的ProposalID,以及Acceptor接受过的编号最大的提案等。

  • 根据MsgCounter统计的Acceptor投票结果决定是进入Acceptor阶段还是重新发起Prepare请求。这里如果判断需要重新发起Prepare请求的话,也不是立即进行,而是等待一段随机的时间,这样做的好处是减少不同Proposer之间的冲突,采取的策略跟raft中leader选举冲突时在一段随机的选举超时时间后重新发起选举的做法类似。

注:这里跟Paxos算法中提案编号对应的并不是ProposalID,而是BallotNumber。BallotNumber由ProposalID和NodeID组成。还实现了运算符重载。如果ProposalID大,则BallotNumber(即提案编号)大。在ProposalID相同的情况下,NodeID大的BallotNumber大。

Accept 阶段

接下来Proposer就进入Accept阶段:

Accept

// src/algorithm/proposer.cppvoid Proposer :: Accept()
{PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu", m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());BP->GetProposerBP()->Accept();m_oTimeStat.Point();ExitPrepare();m_bIsAccepting = true;//设置Accept请求的消息内容PaxosMsg oPaxosMsg;oPaxosMsg.set_msgtype(MsgType_PaxosAccept);oPaxosMsg.set_instanceid(GetInstanceID());oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());oPaxosMsg.set_value(m_oProposerState.GetValue());oPaxosMsg.set_lastchecksum(GetLastChecksum());m_oMsgCounter.StartNewRound();AddAcceptTimer();PLGHead("END");//发给各个节点BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}

Accept请求中 PaxosMsg里的Value是这样确定的:如果Prepare阶段有Acceptor的回复中带有提案值,则该Value为所有的Acceptor的回复中,编号最大的提案的值。否则就是Proposer在最初调用NewValue时传入的值。

OnAcceptReply

// src/algorithm/proposer.cppvoid Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg)
{PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());BP->GetProposerBP()->OnAcceptReply();if (!m_bIsAccepting){//PLGErr("Not proposing, skip this msg");BP->GetProposerBP()->OnAcceptReplyButNotAccepting();return;}if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID()){//PLGErr("ProposalID not same, skip this msg");BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();return;}m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());if (oPaxosMsg.rejectbypromiseid() == 0){PLGDebug("[Accept]");m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());}else{PLGDebug("[Reject]");m_oMsgCounter.AddReject(oPaxosMsg.nodeid());m_bWasRejectBySomeone = true;m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());}if (m_oMsgCounter.IsPassedOnThisRound()){int iUseTimeMs = m_oTimeStat.Point();BP->GetProposerBP()->AcceptPass(iUseTimeMs);PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);ExitAccept();//让Leaner学习被选定(Chosen)的值m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());}else if (m_oMsgCounter.IsRejectedOnThisRound()|| m_oMsgCounter.IsAllReceiveOnThisRound()){BP->GetProposerBP()->AcceptNotPass();PLGImp("[Not pass] wait 30ms and Restart prepare");AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);}PLGHead("END");
}

这里跟OnPrepareReply的过程基本一致。比较大的区别在于最后如果过半的Acceptor接受了该Accept请求,则说明该Value被选定(Chosen)了,就发送消息,让每个节点上的Learner学习该Value。

Acceptor

OnPrepare

OnPrepare用于处理收到的Prepare请求,逻辑如下:

int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());BP->GetAcceptorBP()->OnPrepare();PaxosMsg oReplyPaxosMsg;oReplyPaxosMsg.set_instanceid(GetInstanceID());oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);//构造接收到的Prepare请求里的提案编号BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());//提案编号大于承诺过的提案编号if (oBallot >= m_oAcceptorState.GetPromiseBallot()){PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu ""State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID,m_oAcceptorState.GetAcceptedBallot().m_llProposalID,m_oAcceptorState.GetAcceptedBallot().m_llNodeID);//返回之前接受过的提案的编号oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);//如果接受过的提案编号大于0(<=0说明没有接受过提案),则设置接受过的提案的Valueif (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0){oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());}//更新承诺的提案编号为新的提案编号(因为新的提案编号更大)m_oAcceptorState.SetPromiseBallot(oBallot);//信息持久化int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());if (ret != 0){BP->GetAcceptorBP()->OnPreparePersistFail();PLGErr("Persist fail, Now.InstanceID %lu ret %d",GetInstanceID(), ret);return -1;}BP->GetAcceptorBP()->OnPreparePass();}//提案编号小于承诺过的提案编号,需要拒绝else{BP->GetAcceptorBP()->OnPrepareReject();PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID);//拒绝该Prepare请求,并返回承诺过的ProposalID      oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);}nodeid_t iReplyNodeID = oPaxosMsg.nodeid();PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",GetInstanceID(), oPaxosMsg.nodeid());;//向发出Prepare请求的Proposer回复消息SendMessage(iReplyNodeID, oReplyPaxosMsg);return 0;
}

OnAccept

再来看看OnAccept:

void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());BP->GetAcceptorBP()->OnAccept();PaxosMsg oReplyPaxosMsg;oReplyPaxosMsg.set_instanceid(GetInstanceID());oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());//提案编号不小于承诺过的提案编号(注意:这里是“>=”,而再OnPrepare中是“>”,可以先思考下为什么),需要接受该提案if (oBallot >= m_oAcceptorState.GetPromiseBallot()){PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu ""State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID,m_oAcceptorState.GetAcceptedBallot().m_llProposalID,m_oAcceptorState.GetAcceptedBallot().m_llNodeID);//更新承诺的提案编号;接受的提案编号、提案值m_oAcceptorState.SetPromiseBallot(oBallot);m_oAcceptorState.SetAcceptedBallot(oBallot);m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());//信息持久化int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());if (ret != 0){BP->GetAcceptorBP()->OnAcceptPersistFail();PLGErr("Persist fail, Now.InstanceID %lu ret %d",GetInstanceID(), ret);return;}BP->GetAcceptorBP()->OnAcceptPass();}//需要拒绝该提案else{BP->GetAcceptorBP()->OnAcceptReject();PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID);//拒绝的消息中附上承诺过的ProposalIDoReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);}nodeid_t iReplyNodeID = oPaxosMsg.nodeid();PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",GetInstanceID(), oPaxosMsg.nodeid());//将响应发送给ProposerSendMessage(iReplyNodeID, oReplyPaxosMsg);
}

通过 PhxPaxos 了解 Paxos 原理相关推荐

  1. 【转载】架构师需要了解的Paxos原理、历程及实战

    原文链接,请参见:http://weibo.com/ttarticle/p/show?id=2309403952892003376258 数据库高可用性难题 数据库的数据一致和持续可用对电子商务和互联 ...

  2. phxpaxos编译示例

    官网:Tencent/phxpaxos: The Paxos library implemented in C++ that has been used in the WeChat productio ...

  3. Zookeeper分布式一致性原理(二):一致性协议

    为了解决分布式一致性问题,在长期的研究过程中,提出了一大批经典的一致性协议和算法,其中最著名的就是2PC和3PC以及Paxos算法了. 1. 2PC和3PC 在分布式系统中,每个节点都明确知道自己事务 ...

  4. 分布式事务之底层原理揭秘

    , hi 大家好,今天分享一这篇文章,让大家彻底了解分布式原理,这个是后台开发必须掌握技能. 刚性事务 柔性事务 本地事务 分布式事务 单阶段原子提交协议 两阶段提交协议 定义 原理 性能 恢复 缺陷 ...

  5. 分布式之系统底层原理

    作者:allanpan,腾讯 IEG 高级后台工程师 导言 分布式事务是分布式系统必不可少的组成部分,基本上只要实现一个分布式系统就逃不开对分布式事务的支持.本文从分布式事务这个概念切入,尝试对分布式 ...

  6. Paxos协议基本原理

    /* 版权声明:可以任意转载,转载时请标明文章原始出处和作者信息 .*/ author: 张俊林 本文节选自<大数据日知录:架构与算法>第二章"数据复制与一致性" |2 ...

  7. 一致性协议Paxos详解(一):Basic Paxos协议详解

    一致性协议Paxos详解(一):Basic Paxos协议详解 前言 Paxos是什么 Paxos算法原理与推导 Basic Paxos Proposal Numbers prepare阶段 prep ...

  8. Paxos协议超级详细解释+简单实例

    Basic-Paxos算法(可以先看后面的实际例子再看前面的具体介绍部分) Paxos算法的目的 Paxos算法的目的是为了解决分布式环境下一致性的问题. 多个节点并发操纵数据,如何保证在读写过程中数 ...

  9. 分布式系统中的一致性协议

    本文详细介绍目前分布式系统中常见的一些一致性协议:两阶段提交协议,三阶段提交协议,向量时钟,RWN协议,paxos协议,Raft协议.下面就一个个详细讲解下. 一. 两阶段提交协议(2PC) 两阶段提 ...

最新文章

  1. 如果在CSDN博文编辑状态下获得博文最终网络链接?
  2. Android图片轮播
  3. 关于MonoDevelop自动缩进的设置
  4. 【Spring】使用Spring和AMQP发送接收消息(下)
  5. Pygame最小开发框架
  6. SQL性能健康检查脚本
  7. ADO 动态链接数据库
  8. Swift - 通过url地址打开web页面
  9. 360怎么看电脑配置_电脑配置清单速查表-AMD
  10. Win10保护眼睛豆沙绿背景
  11. 【强大的数字设计工具包】Sketch 55.1 for Mac
  12. php12生肖是哪个,十二生肖对应的数字
  13. 库卡机器人会卡顿吗_看完你就知道德国库卡机器人到底有多牛!
  14. JS案例学习——随机点名案例
  15. 为什么你该学习编程了?
  16. 三星 linux手机系统版本,WindowsMobile操作系统手机版本分类对应机型
  17. 终身伴侣(两个人的网站)代码+效果演示(文末源码地址)
  18. 玩转代码|异步加载 CSS 的最简单方法
  19. office2013中word设置标题自动编号
  20. 推荐一个一键AI抠图网站

热门文章

  1. 2018年全国及31省市数据中心相关政策汇总及解读「全」
  2. 谷歌数据中心采用机器人销毁硬盘驱动器
  3. php 怎么查看原生方法源码_怎么看电脑内存频率?这里有3种方法可以查看,新手分享...
  4. mro python_用python实现MRO算法
  5. 成功解决torch\cuda\__init__.py“, line 208, in check_error raise Cuda Error(res) torch.cuda.Cuda Error: C
  6. Python:利用原生函数count或正则表达式compile、findall、finditer实现匹配统计(包括模糊匹配的贪婪匹配、懒惰匹配)
  7. DL之LSTM:基于tensorflow框架利用LSTM算法对气温数据集训练并回归预测
  8. 成功解决TypeError: slice indices must be integers or None or have an __index__ method
  9. DL之SSD:SSD算法的简介(论文介绍)、架构详解、案例应用等配图集合之详细攻略
  10. EL之GB(GBC):利用GB对多分类问题进行建模(分层抽样+调1参)并评估