更多 blog 见: https://joeylichang.github.io/

本篇内容根据Paxos协议分五部分介绍,即发起Prpare请求、给Prepare请求投票、收集Prepare投票,接收Accept请求 && 统计Acceot结果、Learn阶段,最后总从InstatnceID维度再次理解一下Mutil-Paxos协议,内容比较多。

接上篇提议申请成功之后调用AddNotify通知主循环发起提案,主循环收到通知之后调用CheckNewValue,开始真正的发起提案。

  • 发起Prpare请求

void Instance :: CheckNewValue()
{// 检查是否是一次新的提交// IsNewCommit()判断的标准是InstanceID是否为-1,且ProseValue是否为空,一轮提议结束之后会清空这两个值// 此时 InstanceID 还没有确定,ProseValue根据Paxos协议需要在Prepare之后才能确定if (!m_oCommitCtx.IsNewCommit()){return;}// 确认Learner学习到最新的数据,防止发起的提案是旧的轮次// IsIMLatest()判断标准是Learner.InstanceID + 1 >= Learner.m_llHighestSeenInstanceIDif (!m_oLearner.IsIMLatest()){return;}if (m_poConfig->IsIMFollower()){PLGErr("I'm follower, skip this new value");m_oCommitCtx.SetResultOnlyRet(PaxosTryCommitRet_Follower_Cannot_Commit);return;}// 当前节点是否加入了集群中if (!m_poConfig->CheckConfig()){PLGErr("I'm not in membership, skip this new value");m_oCommitCtx.SetResultOnlyRet(PaxosTryCommitRet_Im_Not_In_Membership);return;}if ((int)m_oCommitCtx.GetCommitValue().size() > MAX_VALUE_SIZE){PLGErr("value size %zu to large, skip this new value",m_oCommitCtx.GetCommitValue().size());m_oCommitCtx.SetResultOnlyRet(PaxosTryCommitRet_Value_Size_TooLarge);return;}// 给提案设置InstanceID,用Proposer视角下最大的InstanceIDm_oCommitCtx.StartCommit(m_oProposer.GetInstanceID());// 如果设置了超时时间,给提案设置定时器// 超时之后无重试直接将Proposer 的 InstanceID返回给clientif (m_oCommitCtx.GetTimeoutMs() != -1){m_oIOLoop.AddTimer(m_oCommitCtx.GetTimeoutMs(), Timer_Instance_Commit_Timeout, m_iCommitTimerID);}// 时间打点m_oTimeStat.Point();// 新加入节点,此时发起加入集群的请求,用的是SystemVSM,后面文章介绍if (m_poConfig->GetIsUseMembership()&& (m_oProposer.GetInstanceID() == 0 || m_poConfig->GetGid() == 0)){//Init system variables.PLGHead("Need to init system variables, Now.InstanceID %lu Now.Gid %lu", m_oProposer.GetInstanceID(), m_poConfig->GetGid());uint64_t llGid = OtherUtils::GenGid(m_poConfig->GetMyNodeID());string sInitSVOpValue;int ret = m_poConfig->GetSystemVSM()->CreateGid_OPValue(llGid, sInitSVOpValue);assert(ret == 0);m_oSMFac.PackPaxosValue(sInitSVOpValue, m_poConfig->GetSystemVSM()->SMID());m_oProposer.NewValue(sInitSVOpValue);}else{// 发起提案,本篇文章重点介绍if (m_oOptions.bOpenChangeValueBeforePropose) {m_oSMFac.BeforePropose(m_poConfig->GetMyGroupIdx(), m_oCommitCtx.GetCommitValue());}m_oProposer.NewValue(m_oCommitCtx.GetCommitValue());}
}

CheckNewValue重点关注几个函数:

1. m_oCommitCtx.IsNewCommit:判断这个是不是一个新的提案,如果上一个提案还没有结束不能发起。

2. m_oLearner.IsIMLatest:保证目前数据已经是最新的,可以开始发起提案。

3.m_oCommitCtx.StartCommit:用Proposer视角下最大的InstanceID作为提案号,根据Paxos协议内容比较好理解。

4.m_oProposer.NewValue:准备发起Prepare请求。

下面我们先看一下提案超时(m_oIOLoop.AddTimer)之后是怎么处理的,然后再看一下NewValue如何发起Prepare。

void Instance :: OnNewValueCommitTimeout()
{BP->GetInstanceBP()->OnNewValueCommitTimeout();// 删除定时器,更新Preparing状态为falsem_oProposer.ExitPrepare();m_oProposer.ExitAccept();// 更新CommitCtx中状态为commit超时、instance以及值为空,InstanceID会返回给clientm_oCommitCtx.SetResult(PaxosTryCommitRet_Timeout, m_oProposer.GetInstanceID(), "");
}
int Proposer :: NewValue(const std::string & sValue)
{BP->GetProposerBP()->NewProposal(sValue);// 发起提案的值可能不是用户本次调用Propose接口传入的值// ProposerState值不为空表示之前的提案还没有结束,应该继续完成// 这里第一次看的时候很难理解,设想上一轮提案已经被多数accept,// 下一个请求来了,由于之前数据被accept了,下一轮的prepare返回数值是空(Paxos协议),// 当前节点用本次的数据发起提案,那么之前的数据造成了丢失// 总结一下:ProposerState.GetValue不为空表示提案未决,继续用这个提案if (m_oProposerState.GetValue().size() == 0){m_oProposerState.SetValue(sValue);}// LargeBufferMode : 两个超时时间为15s// 正常情况下:START_PREPARE_TIMEOUTM = 2s,START_ACCEPT_TIMEOUTMS = 1sm_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;// m_bCanSkipPrepare 表示上一次提案prepare请求被允许了// m_bWasRejectBySomeone 表示当前节点的提案没有任何一个节点拒绝// 满足这两个条件,即可以跳过prepare阶段// 这正是Mutil-Paxos的思想if (m_bCanSkipPrepare && !m_bWasRejectBySomeone){BP->GetProposerBP()->NewProposalSkipPrepare();PLGHead("skip prepare, directly start accept");Accept();}else{//if not reject by someone, no need to increase ballotPrepare(m_bWasRejectBySomeone);}return 0;
}

NewValue需要重点理解对于悬而未决提案如何处理,以及Mutil-Paxos(这里只是一个开始后面还有相关逻辑)。下面看一下Prepare做了什么。

void Proposer :: Prepare(const bool bNeedNewBallot)
{PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),m_oProposerState.GetValue().size());BP->GetProposerBP()->Prepare();m_oTimeStat.Point();// 在Mutil-Paxos中,大多数情况下是不会进行prepare阶段// 只有在第一次 或者 有新Proposer发起天,这个时候需要重置下面的标志ExitAccept();m_bIsPreparing = true;m_bCanSkipPrepare = false;m_bWasRejectBySomeone = false;// 清空投票信息m_oProposerState.ResetHighestOtherPreAcceptBallot();if (bNeedNewBallot){// 更新 m_llProposalID // m_llProposalID 可以理解为Proposer更换时的InstanceIDm_oProposerState.NewPrepare();}PaxosMsg oPaxosMsg;oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);oPaxosMsg.set_instanceid(GetInstanceID());oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());// 开启新的投票轮次m_oMsgCounter.StartNewRound();AddPrepareTimer();PLGHead("END OK");BroadcastMessage(oPaxosMsg);
}

Prepare可以理解为新的Proposer初始化阶段,这里面需要重点理解ProposerState的m_llProposalID,ProposalID可以理解为更换Proposer时InstanceID + 1,用它进行争取选票,如果熟悉RedisCluster,类似currentEpoch和configEpoch很容易理解。

  • 给Prepare请求投票

下面看一下收到Prepare请求之后的代码和时序图:

int Instance :: ReceiveMsgForAcceptor(const PaxosMsg & oPaxosMsg, const bool bIsRetry)
{if (m_poConfig->IsIMFollower()){PLGErr("I'm follower, skip this message");return 0;}//if (oPaxosMsg.instanceid() != m_oAcceptor.GetInstanceID()){BP->GetInstanceBP()->OnReceivePaxosAcceptorMsgInotsame();}// 加快learn速度,大1表示上一轮提案已经结束// 如果是prepare请求,应该是相等,因为每一轮投票结束之后// proposer/acceptor的InstanceID都会+1保持一致// 这个逻辑只是用来加快learn速度,在learn到还差一个数据时可以加快速度,进入正常的投票状态// 后面文章会介绍learn逻辑,之后会好理解if (oPaxosMsg.instanceid() == m_oAcceptor.GetInstanceID() + 1){//skip success messagePaxosMsg oNewPaxosMsg = oPaxosMsg;oNewPaxosMsg.set_instanceid(m_oAcceptor.GetInstanceID());oNewPaxosMsg.set_msgtype(MsgType_PaxosLearner_ProposerSendSuccess);ReceiveMsgForLearner(oNewPaxosMsg);}// 表示是统一轮次的投票    if (oPaxosMsg.instanceid() == m_oAcceptor.GetInstanceID()){if (oPaxosMsg.msgtype() == MsgType_PaxosPrepare){return m_oAcceptor.OnPrepare(oPaxosMsg);}else if (oPaxosMsg.msgtype() == MsgType_PaxosAccept){m_oAcceptor.OnAccept(oPaxosMsg);}}// 表示当前节点落后,会加如主循环的重试队列中进行重试// 重试队列中的消息可以保证重试的消息是按InstanceID递增进行的else if ((!bIsRetry) && (oPaxosMsg.instanceid() > m_oAcceptor.GetInstanceID())){//retry msg can't retry again.if (oPaxosMsg.instanceid() >= m_oLearner.GetSeenLatestInstanceID()){if (oPaxosMsg.instanceid() < m_oAcceptor.GetInstanceID() + RETRY_QUEUE_MAX_LEN){//need retry msg precondition//1. prepare or accept msg//2. msg.instanceid > nowinstanceid. //    (if < nowinstanceid, this msg is expire)//3. msg.instanceid >= seen latestinstanceid. //    (if < seen latestinstanceid, proposer don't need reply with this instanceid anymore.)//4. msg.instanceid close to nowinstanceid.m_oIOLoop.AddRetryPaxosMsg(oPaxosMsg);BP->GetInstanceBP()->OnReceivePaxosAcceptorMsgAddRetry();//PLGErr("InstanceID not same, get in to retry logic");}else{//retry msg not series, no use.m_oIOLoop.ClearRetryQueue();}}}return 0;
}

接下来重点看一下m_oAcceptor.OnPrepare(oPaxosMsg)逻辑:

int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());BP->GetAcceptorBP()->OnPrepare();// 这里重点说一下proposalid// 它用来表示本次投票是给proposalid对应的proposer// 如前面介绍的每次更新proposer会提升InstanceID给proposalidPaxosMsg oReplyPaxosMsg;oReplyPaxosMsg.set_instanceid(GetInstanceID());oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);// 投票BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());// BallotNumber重载了运算符 >= ,先比较proposalid 再比较nodeid// 等于表示proposer没有变化,大于表示变更了proposer并且比之前的proposer新if (oBallot >= m_oAcceptorState.GetPromiseBallot()){PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu ""State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID,m_oAcceptorState.GetAcceptedBallot().m_llProposalID,m_oAcceptorState.GetAcceptedBallot().m_llNodeID);// pre的信息用于节点之间交互一些信息// 比如Paxos节点间最大的ProposalID、NodeID,保证每个节点获取的是最新的且一致的        oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);// Paxos协议如果accept值不为空,将value返回// 每轮提议结束,这些信息都会被清空(后面会介绍)if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0){oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());}m_oAcceptorState.SetPromiseBallot(oBallot);// acctpet的结果必须持久化成功,防止进程重启int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());if (ret != 0){BP->GetAcceptorBP()->OnPreparePersistFail();PLGErr("Persist fail, Now.InstanceID %lu ret %d",GetInstanceID(), ret);return -1;}BP->GetAcceptorBP()->OnPreparePass();}// 表示ProposalID小于当前节点,即之前的prosepor,直接忽略else{BP->GetAcceptorBP()->OnPrepareReject();PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID);oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);}nodeid_t iReplyNodeID = oPaxosMsg.nodeid();PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",GetInstanceID(), oPaxosMsg.nodeid());;SendMessage(iReplyNodeID, oReplyPaxosMsg);return 0;
}

目前Paxos第一阶段给Prepare投票的源码已经介绍完了,下面看一下Proposer接收投票之后的处理逻辑

  • 收集Prepare投票

ReceiveMsgForProposer只做了投票是否过期的判断(instanceID),直接看m_oProposer.OnPrepareReply做了什么:

void Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg)
{PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());BP->GetProposerBP()->OnPrepareReply();if (!m_bIsPreparing){BP->GetProposerBP()->OnPrepareReplyButNotPreparing();//PLGErr("Not preparing, skip this msg");return;}// 两个几点的proposalid不同即,发起的proposer不一致if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID()){BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();//PLGErr("ProposalID not same, skip this msg");return;}// 统计收到的票数m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());// 对端接收了Prepare邮票if (oPaxosMsg.rejectbypromiseid() == 0){BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu", oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());// 记录同意的票数m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());// 重点:Paxos协议中,如果收到的prepare回答不为空,选择最大的InstanceID的valuem_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());}else{PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());// 记录拒绝票数m_oMsgCounter.AddReject(oPaxosMsg.nodeid());// 表示下次的提案不能跳过prepare阶段——mutil-paxos内容m_bWasRejectBySomeone = true;// 更新当前paxos组的promiseid(如果大于当前节点记录的promiseid)m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());}// 如果超过半数节点同意,直接发起Accept请求if (m_oMsgCounter.IsPassedOnThisRound()){int iUseTimeMs = m_oTimeStat.Point();BP->GetProposerBP()->PreparePass(iUseTimeMs);PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);// 表示下轮提议可以跳过Prepare阶段——Mutiil-Paxos协议m_bCanSkipPrepare = true;Accept();}// 超过半数投票失败 || 全部投票结束else if (m_oMsgCounter.IsRejectedOnThisRound()|| m_oMsgCounter.IsAllReceiveOnThisRound()){BP->GetProposerBP()->PrepareNotPass();PLGImp("[Not Pass] wait 30ms and restart prepare");AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);}PLGHead("END");
}

OnPrepareReply中需要重点关注收到投票之后,对本地节点记录的一个Paxos节点之间信息的记录,代表了当前Paxos组的状态。

下面分别先看一下收集结超过半数投票失败 或者 全部投票结束还没有成功的逻辑:

void Proposer :: AddPrepareTimer(const int iTimeoutMs)
{if (m_iPrepareTimerID > 0){m_poIOLoop->RemoveTimer(m_iPrepareTimerID);}if (iTimeoutMs > 0){m_poIOLoop->AddTimer(iTimeoutMs,Timer_Proposer_Prepare_Timeout,m_iPrepareTimerID);return;}m_poIOLoop->AddTimer(m_iLastPrepareTimeoutMs,Timer_Proposer_Prepare_Timeout,m_iPrepareTimerID);// 记录instanceID,用于轮次判断m_llTimeoutInstanceID = GetInstanceID();PLGHead("timeoutms %d", m_iLastPrepareTimeoutMs);// 扩大超时重试m_iLastPrepareTimeoutMs *= 2;if (m_iLastPrepareTimeoutMs > MAX_PREPARE_TIMEOUTMS){m_iLastPrepareTimeoutMs = MAX_PREPARE_TIMEOUTMS;}
}void Proposer :: OnPrepareTimeout()
{PLGHead("OK");// 校验InstanceID是否一致if (GetInstanceID() != m_llTimeoutInstanceID){PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",m_llTimeoutInstanceID, GetInstanceID());return;}BP->GetProposerBP()->PrepareTimeout();// 重试Prepare(m_bWasRejectBySomeone);
}

接下来看一下收到半数同意投票之后,发起Accept的逻辑:

void Proposer :: Accept()
{PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu", m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());BP->GetProposerBP()->Accept();m_oTimeStat.Point();// 删除prepare定时器和prepare标志位ExitPrepare();m_bIsAccepting = true;PaxosMsg oPaxosMsg;oPaxosMsg.set_msgtype(MsgType_PaxosAccept);oPaxosMsg.set_instanceid(GetInstanceID());oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());// 此时的value 已经是确定了的本轮发起提议的value// 可能是其他节点中返回Prepare中InstanceID最大的数据// 可能是client传入的数据,可能是上一轮proposer发起但是悬而未决的数据oPaxosMsg.set_value(m_oProposerState.GetValue());oPaxosMsg.set_lastchecksum(GetLastChecksum());// 清空投票结果m_oMsgCounter.StartNewRound();// 添加定时器AddAcceptTimer();PLGHead("END");// 广播accept消息BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}// acctpt超时从prapare开始重试
void Proposer :: OnAcceptTimeout()
{PLGHead("OK");if (GetInstanceID() != m_llTimeoutInstanceID){PLGErr("TimeoutInstanceID %lu not same to NowInstanceID %lu, skip",m_llTimeoutInstanceID, GetInstanceID());return;}BP->GetProposerBP()->AcceptTimeout();// 重新发起preparePrepare(m_bWasRejectBySomeone);
}
  • 接收Accept请求 && 统计Acceot结果

ReceiveMsgForAcceptor之前已经分析过了,下面直接进入m_oAcceptor.OnAccept:

void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());BP->GetAcceptorBP()->OnAccept();PaxosMsg oReplyPaxosMsg;oReplyPaxosMsg.set_instanceid(GetInstanceID());oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());// 投票的标准与Prepare一致// 这里多说一句,根据Paxos协议内容,Prepare应该接受提案号更大的提案二不能是等于或者小于// 但是prepare和accept都是大于等于,因为这里用了Mutil-Paxos,投票判断的标准是proposalid// 而不是InstanceID,如果proposalid 大于等于本地的proposalid,// 同一个proposalid的Instace一定没问题的if (oBallot >= m_oAcceptorState.GetPromiseBallot()){PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu ""State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID,m_oAcceptorState.GetAcceptedBallot().m_llProposalID,m_oAcceptorState.GetAcceptedBallot().m_llNodeID);// 记录了接收数据的节点信息,这个很重要m_oAcceptorState.SetPromiseBallot(oBallot);m_oAcceptorState.SetAcceptedBallot(oBallot);m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());// accept必须落盘且成功int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());if (ret != 0){BP->GetAcceptorBP()->OnAcceptPersistFail();PLGErr("Persist fail, Now.InstanceID %lu ret %d",GetInstanceID(), ret);return;}BP->GetAcceptorBP()->OnAcceptPass();}else{BP->GetAcceptorBP()->OnAcceptReject();PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu", m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID);oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);}nodeid_t iReplyNodeID = oPaxosMsg.nodeid();PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",GetInstanceID(), oPaxosMsg.nodeid());SendMessage(iReplyNodeID, oReplyPaxosMsg);
}

下面看一下Proposer接收Accept应答之后的逻辑:

void Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg)
{PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());BP->GetProposerBP()->OnAcceptReply();if (!m_bIsAccepting){//PLGErr("Not proposing, skip this msg");BP->GetProposerBP()->OnAcceptReplyButNotAccepting();return;}if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID()){//PLGErr("ProposalID not same, skip this msg");BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();return;}m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());if (oPaxosMsg.rejectbypromiseid() == 0){PLGDebug("[Accept]");m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());}else{PLGDebug("[Reject]");m_oMsgCounter.AddReject(oPaxosMsg.nodeid());m_bWasRejectBySomeone = true;m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());}if (m_oMsgCounter.IsPassedOnThisRound()){int iUseTimeMs = m_oTimeStat.Point();BP->GetProposerBP()->AcceptPass(iUseTimeMs);PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);ExitAccept();m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());}else if (m_oMsgCounter.IsRejectedOnThisRound()|| m_oMsgCounter.IsAllReceiveOnThisRound()){BP->GetProposerBP()->AcceptNotPass();PLGImp("[Not pass] wait 30ms and Restart prepare");AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);}PLGHead("END");
}

整个逻辑核统计Prepare投票逻辑基本一致,需要注意的地方有两个:第一是,本轮投票确认失败之后AddAcceptTimer逻辑与之前介绍的一样从Prepare开始重试。第二是Accept之后直接进行Learn。下面开始看一下Learn的逻辑

  • Learn阶段

进入Learn阶段表述多数节点已经Accepted数据,把数据交给状态机,状态机一般是用户根据各自的需求自由扩展(继承相同的接口)。

下面开始看一下Learn的代码:

void Learner :: ProposerSendSuccess(const uint64_t llLearnInstanceID,const uint64_t llProposalID)
{BP->GetLearnerBP()->ProposerSendSuccess();PaxosMsg oPaxosMsg;oPaxosMsg.set_msgtype(MsgType_PaxosLearner_ProposerSendSuccess);oPaxosMsg.set_instanceid(llLearnInstanceID);oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());oPaxosMsg.set_proposalid(llProposalID);oPaxosMsg.set_lastchecksum(GetLastChecksum());//run self first// 先进行本机学习,无需网络开销BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_First);
}

无论是本机学习还是发送给其他节点,都进入ReceiveMsgForLearner函数,下面重点看一下它:

int Instance :: ReceiveMsgForLearner(const PaxosMsg & oPaxosMsg)
{if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_AskforLearn){m_oLearner.OnAskforLearn(oPaxosMsg);}else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_SendLearnValue){m_oLearner.OnSendLearnValue(oPaxosMsg);}// MsgType_PaxosLearner_ProposerSendSuccess 是Accept之后学习的消息// 其他的类型都是追数据或者ckpt的消息,后面文章会介绍// OnProposerSendSuccess 更新了learn的一些数据和信息,后面看一下// 重点看一下当前函数的后半段else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_ProposerSendSuccess){m_oLearner.OnProposerSendSuccess(oPaxosMsg);}else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_SendNowInstanceID){m_oLearner.OnSendNowInstanceID(oPaxosMsg);}else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_ComfirmAskforLearn){m_oLearner.OnComfirmAskForLearn(oPaxosMsg);}else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_SendLearnValue_Ack){m_oLearner.OnSendLearnValue_Ack(oPaxosMsg);}else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_AskforCheckpoint){m_oLearner.OnAskforCheckpoint(oPaxosMsg);}// OnProposerSendSuccess中设置过了if (m_oLearner.IsLearned()){BP->GetInstanceBP()->OnInstanceLearned();SMCtx * poSMCtx = nullptr;// 判断学习的值是否是当前节点Proposer提出的bool bIsMyCommit = m_oCommitCtx.IsMyCommit(m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue(), poSMCtx);if (!bIsMyCommit){BP->GetInstanceBP()->OnInstanceLearnedNotMyCommit();PLGDebug("this value is not my commit");}else{int iUseTimeMs = m_oTimeStat.Point();BP->GetInstanceBP()->OnInstanceLearnedIsMyCommit(iUseTimeMs);PLGHead("My commit ok, usetime %dms", iUseTimeMs);}// 这里是重点,执行了用户的状态机// 表示经过Paxos选举的值确定了并且执行了用户的逻辑,可以认为所有的节点目前的状态是一致的if (!SMExecute(m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue(), bIsMyCommit, poSMCtx)){BP->GetInstanceBP()->OnInstanceLearnedSMExecuteFail();PLGErr("SMExecute fail, instanceid %lu, not increase instanceid", m_oLearner.GetInstanceID());m_oCommitCtx.SetResult(PaxosTryCommitRet_ExecuteFail, m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());m_oProposer.CancelSkipPrepare();return -1;}{// 设置最终的执行结果,用过调用的接口最后回去这里取数据//this paxos instance end, tell proposal donem_oCommitCtx.SetResult(PaxosTryCommitRet_OK, m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());if (m_iCommitTimerID > 0){m_oIOLoop.RemoveTimer(m_iCommitTimerID);}}PLGHead("[Learned] New paxos starting, Now.Proposer.InstanceID %lu ""Now.Acceptor.InstanceID %lu Now.Learner.InstanceID %lu",m_oProposer.GetInstanceID(), m_oAcceptor.GetInstanceID(), m_oLearner.GetInstanceID());PLGHead("[Learned] Checksum change, last checksum %u new checksum %u",m_iLastChecksum, m_oLearner.GetNewChecksum());m_iLastChecksum = m_oLearner.GetNewChecksum();// 这里也是重点,在这里Proposer、Acceptor、Learn中的InstanceID++// AcceptorState、ProposerState、LearnerState中的数据、投票信息都会清空// 所以理想情况下Proposer、Acceptor、Learn中的InstanceID应该是一致的NewInstance();PLGHead("[Learned] New paxos instance has started, Now.Proposer.InstanceID %lu ""Now.Acceptor.InstanceID %lu Now.Learner.InstanceID %lu",m_oProposer.GetInstanceID(), m_oAcceptor.GetInstanceID(), m_oLearner.GetInstanceID());m_oCheckpointMgr.SetMaxChosenInstanceID(m_oAcceptor.GetInstanceID());BP->GetInstanceBP()->NewInstance();}return 0;
}

最后看一下,OnProposerSendSuccess对Learner做了什么改动:

void Learner :: OnProposerSendSuccess(const PaxosMsg & oPaxosMsg)
{BP->GetLearnerBP()->OnProposerSendSuccess();PLGHead("START Msg.InstanceID %lu Now.InstanceID %lu Msg.ProposalID %lu State.AcceptedID %lu ""State.AcceptedNodeID %lu, Msg.from_nodeid %lu",oPaxosMsg.instanceid(), GetInstanceID(), oPaxosMsg.proposalid(), m_poAcceptor->GetAcceptorState()->GetAcceptedBallot().m_llProposalID,m_poAcceptor->GetAcceptorState()->GetAcceptedBallot().m_llNodeID, oPaxosMsg.nodeid());// 表示不是一个轮次的投票if (oPaxosMsg.instanceid() != GetInstanceID()){//Instance id not same, that means not in the same instance, ignord.PLGDebug("InstanceID not same, skip msg");return;}if (m_poAcceptor->GetAcceptorState()->GetAcceptedBallot().isnull()){//Not accept any yet.BP->GetLearnerBP()->OnProposerSendSuccessNotAcceptYet();PLGDebug("I haven't accpeted any proposal");return;}BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());// 判断学习的诗句是否值之前投票信息if (m_poAcceptor->GetAcceptorState()->GetAcceptedBallot()!= oBallot){//Proposalid not same, this accept value maybe not chosen value.PLGDebug("ProposalBallot not same to AcceptedBallot");BP->GetLearnerBP()->OnProposerSendSuccessBallotNotSame();return;}//learn value.// 更新m_oLearnerState信息m_oLearnerState.LearnValueWithoutWrite(oPaxosMsg.instanceid(),m_poAcceptor->GetAcceptorState()->GetAcceptedValue(),m_poAcceptor->GetAcceptorState()->GetChecksum());BP->GetLearnerBP()->OnProposerSendSuccessSuccessLearn();PLGHead("END Learn value OK, value %zu", m_poAcceptor->GetAcceptorState()->GetAcceptedValue().size());TransmitToFollower();
}
  • 理解

本篇文章比较长,涉及到一些Mutil-Pasox协议的内容,与Paxos在实现上有些许出入,可能需要反复看几遍才能理解。其中Proposer、Acceptor、Learner都有自己的state结构体,以及InstanceID还有一个ProposalID,我们接下来从ID这个维度梳理一下Mutil-Paxos协议。

  1. InistanceID有三个,Proposer-InstanceID、Acceptor-InstanceID、Learner-InstanceID。Proposer、Accept、Learn 全部继承自Base类,Base类中维护InstanceID,在一轮提议结束之后会调用基类的NewInstance,Instance++。
  2. Proposer-InstanceID、Acceptor-InstanceID、Learner-InstanceID相等,并且理想情况下所有节点的Instance也相等。
  3. ProposalID:标识Proposer,每次Proposer变更(可以理解为切主),会取当前的InstanceID++;

有了上述的基础认知,我们看一下在整个提案流程中,用到的InstanceID:

  1. 提议之前会校验(Learner-InstanceID+ 1 >= m_llHighestSeenInstanceID),以当前节点视角看发起提案的提案号是否是Paxos几个组中所有节点上InstanceID最大的提案,否则不能发起提案。
  2. 发送Prepare请求:
    1. 如果接收的Proposer-InstanceID == 本机Acceptor-InstanceID + 1,直接ReceiveMsgForLearner,然后继续。表示当前节点落后proposer一个轮次,如果Leaner-InstanceID正好相等表示这就是learn要学习的最后一个数据,直接学习——算是一个工程优化。

    2. 如果接收的Proposer-InstanceID > 本机Acceptor-InstanceID,加入重试队列(表示非本轮提议)。

    3. 如果接收的Proposer-InstanceID < 本机Acceptor-InstanceID,忽略。

    4. 如果接收的Proposer-InstanceID == 本机Acceptor-InstanceID,投票逻辑(检查ProposalID)。

  3. 收集Prepare结果:

    1. 如果Proposer-InstanceID != 返回的InstanceID,表示非本轮提议,那么该Proposer提议过期。

    2. 根据ProposalID判断是否是之前的Proposer发起(校验主是否变化),决定投票结果。

  4. 发送Accept请求(同发送Prepare请求)

  5. 手机Accept结果(同收集Prepare结果)

  6. Learner数据:

    1. 校验Learner-InstanceID与消息中InstanceID是否一致,是一个轮次的

    2. 调用NewInstance():

      1. Proposer-InstanceID++、Acceptor-InstanceID++、Learner-InstanceID++。

      2. Proposer:清空计数器,清空prepare、accept定时器、ProposerState全部清空(值、m_llHighestOtherProposalID、投票信息),除了m_llProposalID,用于记录之前的Proposer(即当前的主)。

      3. Acceptor:清空投票、值、checksum,不清空m_oPromiseBallot。

      4. Learner:清空值、状态、checksum(全部状态信息)。

PhxPaxos源码分析之(3)提案发起篇(Paxos协议核心)相关推荐

  1. thinkphp源码分析(二)—入口篇

    源码分析---入口篇 源码分析 应用入口 用户发起的请求都会经过应用的入口文件,通常是 ==public/index.php==文件.当然,你也可以更改或者增加新的入口文件. 通常入口文件的代码都比较 ...

  2. Mybatis源码分析第一天------Mybatis实用篇

    Mybatis源码分析第一天------Mybatis实用篇 一切最基本的操作就是参考官方文档:https://mybatis.org/mybatis-3/zh/configuration.html ...

  3. 【投屏】Scrcpy源码分析四(最终章 - Server篇)

    Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...

  4. 一步步实现windows版ijkplayer系列文章之三——Ijkplayer播放器源码分析之音视频输出——音频篇

    https://www.cnblogs.com/harlanc/p/9693983.html 目录 OpenSL ES & AudioTrack 源码分析 创建播放器音频输出对象 配置并创建音 ...

  5. 一步步实现windows版ijkplayer系列文章之二——Ijkplayer播放器源码分析之音视频输出——视频篇...

    一步步实现windows版ijkplayer系列文章之一--Windows10平台编译ffmpeg 4.0.2,生成ffplay 一步步实现windows版ijkplayer系列文章之二--Ijkpl ...

  6. PhxPaxos源码分析:网络

    2019独角兽企业重金招聘Python工程师标准>>> 欢迎大家前往腾讯云社区,获取更多腾讯海量技术实践干货哦~ 作者:LBD 了解分布式系统的童鞋肯定听过Paxos算法的大名.Pa ...

  7. PhxPaxos源码分析——网络

    了解分布式系统的童鞋肯定听过Paxos算法的大名.Paxos算法以晦涩难懂著称,其工程实现更难.目前,号称在工程上实现了Paxos算法的应该只有Google.阿里和腾讯.然而,只有腾讯的微信团队真正将 ...

  8. thinkphp源码分析(一)—开门篇

    源码分析-开门篇 thinkphp生命周期 1.入口文件 用户发起的请求都会经过应用的入口文件,通常是 ==public/index.php==文件.当然,你也可以更改或者增加新的入口文件. 通常入口 ...

  9. 「Leakcanary 源码分析」看这一篇就够了

    image.png 「Leakcanary 」是我们经常用于检测内存泄漏的工具,简单的使用方式,内存泄漏的可视化,是我们开发中必备的工具之一. 分析源码之前 Leakcanary 大神的 github ...

  10. UnityStandardAsset工程、源码分析_2_赛车游戏[玩家控制]_车辆核心控制

    上一章地址:UnityStandardAsset工程.源码分析_1_赛车游戏[玩家控制]_输入系统 在上一章里,我们了解了整个车辆控制的大体流程,并且分析了一下输入系统,也就是从玩家的手柄\手机倾斜输 ...

最新文章

  1. Http Tunnel 小记
  2. Gradle常用配置-版本号自增
  3. threejs设置对象层次
  4. Python网络编程基础
  5. Virtual Box下配置Host-Only联网方式详解
  6. CCF201809-2 买菜(100分)【序列处理+差分】
  7. oracle用户sde老是锁定,关于ArcGIS10.0版本的SDE密码修改,账户锁定,SDE服务启动又停止等问题的解决...
  8. LINUX自带库与系统重名怎么办
  9. 6m缓存和8m缓存差距_i79700和i78700性能差距有多大?i79700和i78700区别对比评测
  10. 蝌蚪网课助手mac_疫情期间如何录网课?(干货教程)手把手教你录出高质量网课。...
  11. delphi 操作excel 复选框
  12. 链新:探索NFT中国化路径,与实体经济相结合
  13. 如何在简历中使用STAR法则
  14. 两种“猿” 两个生活
  15. 计算机考研怎么计划,计算机考研复习计划怎么制定
  16. touchGFX综合学习五、touchGFX加载外部(SDCARD、SPI FLASH等)字体显示,包括中文
  17. 统计输入数据的个数、求和、平均值、方差、中位数
  18. 【查找算法】折半查找法
  19. 【阅读】A Comprehensive Survey on Electronic Design Automation and Graph Neural Networks——EDA+GNN综述翻译
  20. 除去设备驱动中的腾讯视频

热门文章

  1. 2018年“双十一”“双十二”线上消费投诉较去年增长近八成
  2. sm羞耻任务_代码简介:我仍然为自己感到羞耻的代码
  3. 基于复杂网络的软件网络研究
  4. 深入 JavaScript 设计模式,从此有了优化代码的理论依据
  5. 【我的第一份开发工作】1.找工作前的经历
  6. 汽车模具设计以后的前途怎么样?
  7. 前端面试——浏览器存储浏览器缓存(http缓存机制)
  8. 幸福人生没有捷径可走
  9. 2700分+!统信UOS+龙芯3A4000最新Unixbench跑分出炉
  10. 注册商标申请费用降低了吗