文章目录

  • 5. Preprepare消息的接收以及Prepare消息的发送
  • 6. Prepare消息的接收以及Commit消息的发送

5. Preprepare消息的接收以及Prepare消息的发送

本系列的第一篇博客介绍了源代码的获取和测试用例的选择,第二篇博客介绍了Primary节点对repBatch的处理,以及对PrePrepare消息的发送。强烈建议读者先阅读上述两篇博客,再来阅读当前这篇。

为方便读者查阅,本系列博客的链接整理如下:

  • Fabric中PBFT源码解读 (1)
  • Fabric中PBFT源码解读 (2)
  • Fabric中PBFT源码解读 (3)
  • Fabric中PBFT源码解读 (4)
  • Fabric中PBFT源码解读 (5)

4.2小节介绍了节点0Preprepare消息的发送过程,本小节介绍其他节点(以节点1为例)接收Preprepare消息的过程和发送Prepare消息的过程。
由4.2小节的最后介绍可知,节点0Preprepare消息最终发送到了节点1events通道中。
此外,4.2小节中提到每个节点会启动一个协程运行eventloop函数。为唤起读者的记忆,eventLoop函数的代码再次摘录如下。

// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {for {select {case next := <-em.events:em.Inject(next)...}}
}

eventLoop函数进一步调用Inject函数、SendEvent函数、pbftCore.ProcessEvent函数。以上都和节点0接收reqBatch消息时的流程相同,以下从pbftCore.ProcessEvent函数开始介绍。

// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {...switch et := e.(type) {...case *PrePrepare:err = instance.recvPrePrepare(et)...}
}// [pbft-core.go] ProcessEvent -> recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {...if !instance.inWV(preprep.View, preprep.SequenceNumber) {   // L715...return nil}...cert := instance.getCert(preprep.View, preprep.SequenceNumber) // L732...if instance.primary(instance.view) != instance.id && instance.prePrepared(preprep.BatchDigest, preprep.View, preprep.SequenceNumber) && !cert.sentPrepare {             // L758...cert.sentPrepare = true                         // L766...}...
}

ProcessEvent函数中的switch判断后进入PrePrepare case,继而调用recvPrePrepare函数。recvPrePrepare函数主要保存preprepare消息,并发送prepare消息。
recvPrePrepare函数首先做了一些检查,这里挑选了一个比较有意思的:L715行检查收到的preprep消息是否是当前View中,且SequenceNumber在一定的区间范围内。
L758行的判断语句比较长,主要包括了三部分:

  1. 判断当前节点是否是PrimaryPBFTPrepare阶段不需要Primary节点发送Prepare消息。
  2. 判断当前是否已经到达了preprepared的状态。OSDI‘99的论文中并未定义preprepared的状态原语,这里是代码实现中的额外定义。关于preprepared状态的检查在后面再进行介绍。
  3. 判断是否已经发送过了prepare消息了。sentPrepare是一个bool类型的变量,其在L766行赋值为true

以下介绍对preprepared状态的检查。prePrepared函数定义如下:

// [pbft-core.go] ProcessEvent -> recvPrePrepare -> prePrepared
func (instance *pbftCore) prePrepared(digest string, v uint64, n uint64) bool {...cert := instance.certStore[msgID{v, n}]                    // L483if cert != nil {p := cert.prePrepareif p != nil && p.View == v && p.SequenceNumber == n && p.BatchDigest == digest {                                    // L486return true}}...
}

prePrepared函数中主要对instance的字典类型字段certStore进行了查找,查找该字典中是否已经存储了相应的prePrepare内容(L483行),并且查询得到的prePrepare内容与参数中的digestvn保持一致(L486行)。
其中L483行是读certStore字段,那么在此之前应该对certStore字段进行了写操作。该写操作发生在recvPrePrepare函数的L734行,相关代码如下所示。

// [pbft-core.go] recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {...cert := instance.getCert(preprep.View, preprep.SequenceNumber)  // L732...
}// [pbft-core.go] recvPrePrepare -> getCert
func (instance *pbftCore) getCert(v uint64, n uint64) (cert *msgCert) {idx := msgID{v, n}cert, ok := instance.certStore[idx]if ok {return}cert = &msgCert{}instance.certStore[idx] = certreturn
}

容易看到recvPrePrepare函数的L732行调用了getCert函数,后者首先查看certStore中是否已经保存了相应的idx,如果没有,则生成一个idx并存储在certStore中。
回到recvPrePrepare函数的L758行,相关代码如下所示。当第一次收到preprepare消息时,该判断语句值为true。首先在L760行定义了代表prepare消息的prep变量。该变量将在L768行和L769行分别发送给自己和其他节点。L769行构建了代表prepare消息的Message_Prepare变量,并通过innerBroadcast函数发送出去。innerBroadcast函数的调用过程和4.2小节中的类似。

这里的代码实现和OSDI‘99论文中的描述有些出入,OSDI’99论文中prepare消息不发送给自己;相应地,后面prepare消息的计数也就不包括自己的。这里的代码实现中,L768行将prepare消息发给了自己;并且,后面的prepare消息的计数也包括了自己的。

// [pbft-core.go] ProcessEvent -> recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {...if instance.primary(instance.view) != instance.id && instance.prePrepared(preprep.BatchDigest, preprep.View, preprep.SequenceNumber) && !cert.sentPrepare {             // L758...prep := &Prepare{                               // L760...}...instance.recvPrepare(prep)                      // L768return instance.innerBroadcast(&Message{Payload: &Message_Prepare{Prepare: prep}})                       // L769}...
}

此外,有个疑惑是L769行不是实现了prepare消息的广播吗?广播不就已经包括了当前节点(即节点0)了吗?为什么还要使用L768行给自己发送一次呢。这是因为innerBroadcast的代码实现中去除了对当前节点的消息发送。4.2小节已经介绍了innerBroadcast函数中对其他函数的调用链路,即innerBroadcast->simpleConsumer.broadcast->Broadcast->broadcastFilter->internalQueueMessageinternalQueueMessage函数中的消息通过net.msgs通道被testnet.process函数读取。process函数进一步调用processMessageFromChanneldeliverFilter。也就是说,prepare消息的广播是通过deliverFilter函数完成的。deliverFilter函数的相关代码摘录如下:

// [mock_network_test.go]
func (net *testnet) deliverFilter(msg taggedMsg) {...if msg.dst == -1 {...for id, ep := range net.endpoints {...lid := id...go func() {...if msg.src == lid {...return}}}}...
}

从代码中容易发现,当消息的发送着msg.src和消息的接受者lid相同时,直接从协程中返回。

6. Prepare消息的接收以及Commit消息的发送

第5节介绍了节点1中对Prepare消息的发送,本节介绍节点(以节点2为例)对Prepare的接收,基于接收到的Prepare消息判断是否进入了Prepared状态,并决定是否发送Commit消息。
结合4.2小节和第5节可知,节点1中的Prepare消息最终是发送到了节点2events通道中。
此外,4.2小节中提到每个节点会启动一个协程运行eventloop函数。为唤起读者的记忆,eventLoop函数的代码再次摘录如下。

// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {for {select {case next := <-em.events:em.Inject(next)...}}
}

eventLoop函数进一步调用Inject函数、SendEvent函数、pbftCore.ProcessEvent函数。以上都和节点0接收reqBatch消息(节点1接收PrePrepare消息)时的流程相同,以下从pbftCore.ProcessEvent函数开始介绍。

// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {...switch et := e.(type) {...case *Prepare:                                      // L344err = instance.recvPrepare(et)...}
}

此时的ProcessEvent函数中switch选择为Prepare case,并调用recvPrepare函数,对应于L344行代码。recvPrepare函数和第5节中介绍的recvPrePrepare函数很类似,关键代码摘录如下:

// [pbft-core.go] ProcessEvent -> recvPrepare
func (instance *pbftCore) recvPrepare(prep *Prepare) error {...if instance.primary(prep.View) == prep.ReplicaId {          // L779...return nil}...cert := instance.getCert(prep.View, prep.SequenceNumber)    // L794...cert.prepare = append(cert.prepare, prep)                 // L802...return instance.maybeSendCommit(prep.BatchDigest, prep.View, prep.SequenceNumber)                                        // L805
}

recvPrepare函数首先在L779行做了一次判断:若prepare消息的发送方是Primary,则直接返回。这和OSDI‘99论文中是保持一致的,即不需要也不考虑Primary发送的prepare消息。L794行和L802行将新收到的prepare消息存储到cert.prepare字段中。然后便是在L805行调用maybeSendCommit函数。相关代码如下所示:

// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit
func (instance *pbftCore) maybeSendCommit(digest string, v uint64, n uint64) error {cert := instance.getCert(v, n)if instance.prepared(digest, v, n) && !cert.sentCommit {    // L811...commit := &Commit{...}cert.sentCommit = trueinstance.recvCommit(commit)return instance.innerBroadcast(&Message{&Message_Commit{commit}})}return nil
}

容易发现,maybeSendCommit函数和recvPrePrepare函数很类似,最大的不同就是L811行中prepared原语的判断。该prepared原语在OSDI‘99论文中也有定义。prepared函数的代码实现如下:

// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit -> prepared
func (instance *pbftCore) prepared(digest string, v uint64, n uint64) bool {...quorum := 0                                              // L504cert := instance.certStore[msgID{v, n}]...for _, p := range cert.prepare {if p.View == v && p.SequenceNumber == n && p.BatchDigest == digest {quorum++}}                                                       // L514...return quorum >= instance.intersectionQuorum()-1            // L519
}// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit -> prepared -> intersectionQuorum
func (instance *pbftCore) intersectionQuorum() int {return (instance.N + instance.f + 2) / 2
}

prepared函数的实现还是比较容易理解的,首先在L504行到L514行对收到的prepare消息进行计数,计数值为quorum。然后将quorum值与intersectionQuorum-1值进行比较(L519行)。intersectionQuorum值的计算由intersectionQuorum函数完成。当N>3f+1时,intersectionQuorum-1值为2f。这里需要特别强调,prepared状态的判断只需要prepare消息数量达到2f即可;作为对比,下一小节中committed状态的判断需要commit消息数量达到2f+1.

回到maybeSendCommit函数的L811行,当节点2prepare计数超过quorum值时,标记当前节点进入prepared状态,从而开始广播commit消息。

Fabric中PBFT源码解读 (3)相关推荐

  1. Fabric中PBFT源码解读——Checkpoint机制

    文章目录 1. 写在前面 1.1 前置阅读 1.2 对TestCheckpoint函数的测试 2. 对TestCheckpoint函数运行流程的解读 2.1 Checkpoint和Water mark ...

  2. spark源码解读3之RDD中top源码解读

    更多代码请见:https://github.com/xubo245/SparkLearning spark源码解读系列环境:spark-2.0.1 (20161103github下载版) 1.理解 输 ...

  3. mybatis plus 中 EntityWrapper源码解读

    mybatis plus内置了好多CRUD,其中 EntityWrapper这个类就是. 这个类是mybatis plus帮我们写好的好多接口,就如同我们在dao层写好方法在xml中实现一样. 那么这 ...

  4. Thread 中 ThreadLocal 源码解读

    先了解一下ThreadLocal类提供的几个方法: public T get() { } public void set(T value) { } public void remove() { } p ...

  5. pytorch中SGD源码解读

    调用方法: torch.optim.SGD(params, lr=<required parameter>, momentum=0, dampening=0, weight_decay=0 ...

  6. BitXHub 跨链插件(Fabric)源码解读

    前言 趣链科技的BitXHub跨链平台是业界较为完善的跨链开源解决方案,主要通过中继链.网关和插件机制对跨链流程中的功能.安全性和灵活性等进行了优化.本文对BitXHub的meshplus/pier- ...

  7. Hive中lateral view的应用到源码解读

    对于从事大数据开发的同学,经常会应用到explode(炸裂函数)和lateral view(侧输出流). Explode(炸裂函数) 参数必须是array或者map格式(通常跟split函数使用): ...

  8. ios html zfplayer,【iOS】ZFPlayer源码解读中

    前言 本篇继ZFPlayer源码解读基础之上,主要解析说明控制层与播放器,因为在上篇文章至现在并未提及丝毫关于这两个类业务的实现. 首先说下这两个类各自的职责. 控制层:主要负责响应与用户之间的交互, ...

  9. 源码解读一:omit.js

    koroFileHeader插件 在插件市场下载koroFileHeader 在setting.json文件中配置一下内容 // 头部注释 "fileheader.customMade&qu ...

最新文章

  1. Android resource linking failed
  2. Linux系统上传文件与下载文件命令
  3. vue商城项目源码_CMS全栈项目之Vue和React篇(下)(含源码)
  4. 计算机软件工作总结,计算机软件培训总结及小结-工作总结
  5. 14条Yahoo(雅虎)十四条优化原则【转】
  6. 《挖财编程题》水花仙数
  7. Android之 如何解决ScrollView 和ListView滑动冲突的问题如何解决ScrollView can host only one direct child
  8. 关于wParam和lParam
  9. IoC框架,依赖注入
  10. TCP/IP协议栈:TCP超时重传机制
  11. windows下springboot项目部署elk日志系统教程elasticsearch与logstash与kibana
  12. php开发面试题---禁用cookie之后,如何使用session
  13. 赋值pingfang(i)=x; 与或非
  14. Android精品开源项目整理_V20140221
  15. Android10.0编译 make api-stubs-docs-update-current-api问题
  16. 世界超长经典名车荟萃
  17. TabWidget当前标签底线颜色
  18. MongoDB 写安全(Write Concern)
  19. HTC VIVE 禁用头盔定位与角度旋转
  20. 最有福气的家庭:不翻旧账、不争对错、不慕虚荣

热门文章

  1. python生成链接二维码,保存到本地
  2. [Pytorch系列-60]:循环神经网络 - 中文新闻文本分类详解-2-LSTM网络训练与评估代码详解
  3. 科大奥瑞物理实验——光强调制法测光速
  4. 用安卓写一个简单的计算器 android studio
  5. 华为十年,一位大牛的独白
  6. 安卓app与服务器通信协议,Android中的HTTP通信-总结
  7. 页面布局的方式——前端
  8. python的安装和配置环境变量
  9. 十年研发经验工程师的嵌入式学习书籍大推荐(转帖)
  10. 腾讯云备案授权码生成及常见问题解答