Fabric中PBFT源码解读 (3)
文章目录
- 5. Preprepare消息的接收以及Prepare消息的发送
- 6. Prepare消息的接收以及Commit消息的发送
5. Preprepare消息的接收以及Prepare消息的发送
本系列的第一篇博客介绍了源代码的获取和测试用例的选择,第二篇博客介绍了Primary
节点对repBatch
的处理,以及对PrePrepare
消息的发送。强烈建议读者先阅读上述两篇博客,再来阅读当前这篇。
为方便读者查阅,本系列博客的链接整理如下:
- Fabric中PBFT源码解读 (1)
- Fabric中PBFT源码解读 (2)
- Fabric中PBFT源码解读 (3)
- Fabric中PBFT源码解读 (4)
- Fabric中PBFT源码解读 (5)
4.2小节介绍了节点0
中Preprepare
消息的发送过程,本小节介绍其他节点(以节点1
为例)接收Preprepare
消息的过程和发送Prepare
消息的过程。
由4.2小节的最后介绍可知,节点0
的Preprepare
消息最终发送到了节点1
的events
通道中。
此外,4.2小节中提到每个节点会启动一个协程运行eventloop
函数。为唤起读者的记忆,eventLoop
函数的代码再次摘录如下。
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {for {select {case next := <-em.events:em.Inject(next)...}}
}
eventLoop
函数进一步调用Inject
函数、SendEvent
函数、pbftCore.ProcessEvent
函数。以上都和节点0
接收reqBatch
消息时的流程相同,以下从pbftCore.ProcessEvent
函数开始介绍。
// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {...switch et := e.(type) {...case *PrePrepare:err = instance.recvPrePrepare(et)...}
}// [pbft-core.go] ProcessEvent -> recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {...if !instance.inWV(preprep.View, preprep.SequenceNumber) { // L715...return nil}...cert := instance.getCert(preprep.View, preprep.SequenceNumber) // L732...if instance.primary(instance.view) != instance.id && instance.prePrepared(preprep.BatchDigest, preprep.View, preprep.SequenceNumber) && !cert.sentPrepare { // L758...cert.sentPrepare = true // L766...}...
}
ProcessEvent
函数中的switch
判断后进入PrePrepare case
,继而调用recvPrePrepare
函数。recvPrePrepare
函数主要保存preprepare
消息,并发送prepare
消息。
recvPrePrepare
函数首先做了一些检查,这里挑选了一个比较有意思的:L715行检查收到的preprep
消息是否是当前View
中,且SequenceNumber
在一定的区间范围内。
L758行的判断语句比较长,主要包括了三部分:
- 判断当前节点是否是
Primary
。PBFT
的Prepare
阶段不需要Primary
节点发送Prepare
消息。 - 判断当前是否已经到达了
preprepared
的状态。OSDI‘99
的论文中并未定义preprepared
的状态原语,这里是代码实现中的额外定义。关于preprepared
状态的检查在后面再进行介绍。 - 判断是否已经发送过了
prepare
消息了。sentPrepare
是一个bool
类型的变量,其在L766行赋值为true
。
以下介绍对preprepared
状态的检查。prePrepared
函数定义如下:
// [pbft-core.go] ProcessEvent -> recvPrePrepare -> prePrepared
func (instance *pbftCore) prePrepared(digest string, v uint64, n uint64) bool {...cert := instance.certStore[msgID{v, n}] // L483if cert != nil {p := cert.prePrepareif p != nil && p.View == v && p.SequenceNumber == n && p.BatchDigest == digest { // L486return true}}...
}
prePrepared
函数中主要对instance
的字典类型字段certStore
进行了查找,查找该字典中是否已经存储了相应的prePrepare
内容(L483行),并且查询得到的prePrepare
内容与参数中的digest
,v
,n
保持一致(L486行)。
其中L483行是读certStore
字段,那么在此之前应该对certStore
字段进行了写操作。该写操作发生在recvPrePrepare
函数的L734行,相关代码如下所示。
// [pbft-core.go] recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {...cert := instance.getCert(preprep.View, preprep.SequenceNumber) // L732...
}// [pbft-core.go] recvPrePrepare -> getCert
func (instance *pbftCore) getCert(v uint64, n uint64) (cert *msgCert) {idx := msgID{v, n}cert, ok := instance.certStore[idx]if ok {return}cert = &msgCert{}instance.certStore[idx] = certreturn
}
容易看到recvPrePrepare
函数的L732行调用了getCert
函数,后者首先查看certStore
中是否已经保存了相应的idx
,如果没有,则生成一个idx
并存储在certStore
中。
回到recvPrePrepare
函数的L758行,相关代码如下所示。当第一次收到preprepare
消息时,该判断语句值为true
。首先在L760行定义了代表prepare
消息的prep
变量。该变量将在L768行和L769行分别发送给自己和其他节点。L769行构建了代表prepare
消息的Message_Prepare
变量,并通过innerBroadcast
函数发送出去。innerBroadcast
函数的调用过程和4.2小节中的类似。
这里的代码实现和OSDI‘99
论文中的描述有些出入,OSDI’99
论文中prepare
消息不发送给自己;相应地,后面prepare
消息的计数也就不包括自己的。这里的代码实现中,L768行将prepare
消息发给了自己;并且,后面的prepare
消息的计数也包括了自己的。
// [pbft-core.go] ProcessEvent -> recvPrePrepare
func (instance *pbftCore) recvPrePrepare(preprep *PrePrepare) error {...if instance.primary(instance.view) != instance.id && instance.prePrepared(preprep.BatchDigest, preprep.View, preprep.SequenceNumber) && !cert.sentPrepare { // L758...prep := &Prepare{ // L760...}...instance.recvPrepare(prep) // L768return instance.innerBroadcast(&Message{Payload: &Message_Prepare{Prepare: prep}}) // L769}...
}
此外,有个疑惑是L769行不是实现了prepare
消息的广播吗?广播不就已经包括了当前节点(即节点0
)了吗?为什么还要使用L768行给自己发送一次呢。这是因为innerBroadcast
的代码实现中去除了对当前节点的消息发送。4.2小节已经介绍了innerBroadcast
函数中对其他函数的调用链路,即innerBroadcast
->simpleConsumer.broadcast
->Broadcast
->broadcastFilter
->internalQueueMessage
,internalQueueMessage
函数中的消息通过net.msgs
通道被testnet.process
函数读取。process
函数进一步调用processMessageFromChannel
,deliverFilter
。也就是说,prepare
消息的广播是通过deliverFilter
函数完成的。deliverFilter
函数的相关代码摘录如下:
// [mock_network_test.go]
func (net *testnet) deliverFilter(msg taggedMsg) {...if msg.dst == -1 {...for id, ep := range net.endpoints {...lid := id...go func() {...if msg.src == lid {...return}}}}...
}
从代码中容易发现,当消息的发送着msg.src
和消息的接受者lid
相同时,直接从协程中返回。
6. Prepare消息的接收以及Commit消息的发送
第5节介绍了节点1中对Prepare
消息的发送,本节介绍节点(以节点2
为例)对Prepare
的接收,基于接收到的Prepare
消息判断是否进入了Prepared
状态,并决定是否发送Commit
消息。
结合4.2小节和第5节可知,节点1
中的Prepare
消息最终是发送到了节点2
的events
通道中。
此外,4.2小节中提到每个节点会启动一个协程运行eventloop
函数。为唤起读者的记忆,eventLoop
函数的代码再次摘录如下。
// [pbft-core_mock_test.go] makePBFTNetwork -> [events.go] Start -> eventLoop
func (em *managerImpl) eventLoop() {for {select {case next := <-em.events:em.Inject(next)...}}
}
eventLoop
函数进一步调用Inject
函数、SendEvent
函数、pbftCore.ProcessEvent
函数。以上都和节点0
接收reqBatch
消息(节点1
接收PrePrepare
消息)时的流程相同,以下从pbftCore.ProcessEvent
函数开始介绍。
// [pbft-core.go] ProcessEvent
func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {...switch et := e.(type) {...case *Prepare: // L344err = instance.recvPrepare(et)...}
}
此时的ProcessEvent
函数中switch
选择为Prepare case
,并调用recvPrepare
函数,对应于L344行代码。recvPrepare
函数和第5节中介绍的recvPrePrepare
函数很类似,关键代码摘录如下:
// [pbft-core.go] ProcessEvent -> recvPrepare
func (instance *pbftCore) recvPrepare(prep *Prepare) error {...if instance.primary(prep.View) == prep.ReplicaId { // L779...return nil}...cert := instance.getCert(prep.View, prep.SequenceNumber) // L794...cert.prepare = append(cert.prepare, prep) // L802...return instance.maybeSendCommit(prep.BatchDigest, prep.View, prep.SequenceNumber) // L805
}
recvPrepare
函数首先在L779行做了一次判断:若prepare
消息的发送方是Primary
,则直接返回。这和OSDI‘99
论文中是保持一致的,即不需要也不考虑Primary
发送的prepare
消息。L794行和L802行将新收到的prepare
消息存储到cert.prepare
字段中。然后便是在L805行调用maybeSendCommit
函数。相关代码如下所示:
// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit
func (instance *pbftCore) maybeSendCommit(digest string, v uint64, n uint64) error {cert := instance.getCert(v, n)if instance.prepared(digest, v, n) && !cert.sentCommit { // L811...commit := &Commit{...}cert.sentCommit = trueinstance.recvCommit(commit)return instance.innerBroadcast(&Message{&Message_Commit{commit}})}return nil
}
容易发现,maybeSendCommit
函数和recvPrePrepare
函数很类似,最大的不同就是L811行中prepared
原语的判断。该prepared
原语在OSDI‘99
论文中也有定义。prepared
函数的代码实现如下:
// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit -> prepared
func (instance *pbftCore) prepared(digest string, v uint64, n uint64) bool {...quorum := 0 // L504cert := instance.certStore[msgID{v, n}]...for _, p := range cert.prepare {if p.View == v && p.SequenceNumber == n && p.BatchDigest == digest {quorum++}} // L514...return quorum >= instance.intersectionQuorum()-1 // L519
}// [pbft-core.go] ProcessEvent -> recvPrepare -> maybeSendCommit -> prepared -> intersectionQuorum
func (instance *pbftCore) intersectionQuorum() int {return (instance.N + instance.f + 2) / 2
}
prepared
函数的实现还是比较容易理解的,首先在L504行到L514行对收到的prepare
消息进行计数,计数值为quorum
。然后将quorum
值与intersectionQuorum-1
值进行比较(L519行)。intersectionQuorum
值的计算由intersectionQuorum
函数完成。当N>3f+1时,intersectionQuorum-1
值为2f。这里需要特别强调,prepared
状态的判断只需要prepare
消息数量达到2f即可;作为对比,下一小节中committed
状态的判断需要commit
消息数量达到2f+1.
回到maybeSendCommit
函数的L811行,当节点2
的prepare
计数超过quorum
值时,标记当前节点进入prepared
状态,从而开始广播commit
消息。
Fabric中PBFT源码解读 (3)相关推荐
- Fabric中PBFT源码解读——Checkpoint机制
文章目录 1. 写在前面 1.1 前置阅读 1.2 对TestCheckpoint函数的测试 2. 对TestCheckpoint函数运行流程的解读 2.1 Checkpoint和Water mark ...
- spark源码解读3之RDD中top源码解读
更多代码请见:https://github.com/xubo245/SparkLearning spark源码解读系列环境:spark-2.0.1 (20161103github下载版) 1.理解 输 ...
- mybatis plus 中 EntityWrapper源码解读
mybatis plus内置了好多CRUD,其中 EntityWrapper这个类就是. 这个类是mybatis plus帮我们写好的好多接口,就如同我们在dao层写好方法在xml中实现一样. 那么这 ...
- Thread 中 ThreadLocal 源码解读
先了解一下ThreadLocal类提供的几个方法: public T get() { } public void set(T value) { } public void remove() { } p ...
- pytorch中SGD源码解读
调用方法: torch.optim.SGD(params, lr=<required parameter>, momentum=0, dampening=0, weight_decay=0 ...
- BitXHub 跨链插件(Fabric)源码解读
前言 趣链科技的BitXHub跨链平台是业界较为完善的跨链开源解决方案,主要通过中继链.网关和插件机制对跨链流程中的功能.安全性和灵活性等进行了优化.本文对BitXHub的meshplus/pier- ...
- Hive中lateral view的应用到源码解读
对于从事大数据开发的同学,经常会应用到explode(炸裂函数)和lateral view(侧输出流). Explode(炸裂函数) 参数必须是array或者map格式(通常跟split函数使用): ...
- ios html zfplayer,【iOS】ZFPlayer源码解读中
前言 本篇继ZFPlayer源码解读基础之上,主要解析说明控制层与播放器,因为在上篇文章至现在并未提及丝毫关于这两个类业务的实现. 首先说下这两个类各自的职责. 控制层:主要负责响应与用户之间的交互, ...
- 源码解读一:omit.js
koroFileHeader插件 在插件市场下载koroFileHeader 在setting.json文件中配置一下内容 // 头部注释 "fileheader.customMade&qu ...
最新文章
- Android resource linking failed
- Linux系统上传文件与下载文件命令
- vue商城项目源码_CMS全栈项目之Vue和React篇(下)(含源码)
- 计算机软件工作总结,计算机软件培训总结及小结-工作总结
- 14条Yahoo(雅虎)十四条优化原则【转】
- 《挖财编程题》水花仙数
- Android之 如何解决ScrollView 和ListView滑动冲突的问题如何解决ScrollView can host only one direct child
- 关于wParam和lParam
- IoC框架,依赖注入
- TCP/IP协议栈:TCP超时重传机制
- windows下springboot项目部署elk日志系统教程elasticsearch与logstash与kibana
- php开发面试题---禁用cookie之后,如何使用session
- 赋值pingfang(i)=x; 与或非
- Android精品开源项目整理_V20140221
- Android10.0编译 make api-stubs-docs-update-current-api问题
- 世界超长经典名车荟萃
- TabWidget当前标签底线颜色
- MongoDB 写安全(Write Concern)
- HTC VIVE 禁用头盔定位与角度旋转
- 最有福气的家庭:不翻旧账、不争对错、不慕虚荣