这块内容主要是一个拜占庭的过程。本文掐头去尾,只讲一下在 quorum 中是如何实现拜占庭的,也就是共识接口 Seal() 下向拜占庭发送了一个区块的请求事件开始。请求事件如下:

// post block into Istanbul engine
go sb.EventMux().Post(istanbul.RequestEvent{Proposal: block,
})

前言

在说正文之前,先讲四个变量:sequence round state Code,因为这四个变量覆盖了 BFT 的整个流程,可以说 BFT 玩的就是这几个变量。

// 视图包括round和sequence两个数字
//sequence是我们即将提交的区块号
//每个round都有一个数字,而且包含三步:preprepare, prepare and commit// 如果给定一个区块没有被验证者接收,round将会改变,并且验证者将使用 round+1 这个新的round开始新的验证周期
type View struct {Round    *big.IntSequence *big.Int
}

state 表述了每个 round 周期中,进行到了哪一步,是每个 round 周期的状态。

type State uint64const (StateAcceptRequest State = iotaStatePrepreparedStatePreparedStateCommitted
)

标明发送消息的类型

const (msgPreprepare uint64 = iotamsgPreparemsgCommitmsgRoundChangemsgAll   // 未使用
)type message struct {Code          uint64  // 消息的类型Msg           []byte  // 消息的内容Address       common.Address // 发送消息的人Signature     []byte  // 消息的签名CommittedSeal []byte  // 提交消息到区块额外字段时用到
}

BFT正文

从这里开始正式对 quorumistanbul 共识的 BFT 流程进行说明,开始的文件是 consensus/istanbul/core/handler.go

启动BFT

在启动挖矿的时候会启动 BFT,此时开始一个新的 BFT 轮次,然后订阅事件并启动对事件的监听及处理。

func (c *core) Start() error {// Start a new round from last sequence + 1// 开始一个新的轮次c.startNewRound(common.Big0)// Tests will handle events itself, so we have to make subscribeEvents()// be able to call in test.// 订阅事件,事件包括:// |-- 普通拜占庭事件//   |-- 区块提案请求事件//      |-- 消息事件,又分为四种://       |--preprepare//         |--prepare//        |--commit//         |--roundChange//     |-- 积压消息事件// |-- round周期超时事件// |--区块头提交事件c.subscribeEvents()// 启动一个协程来处理不同的事件go c.handleEvents()return nil
}

处理不同的事件

要处理的事件可分为三大类:普通拜占庭事件、round周期超时事件、区块头提交事件普通拜占庭事件监听的是 BFT 一个轮次的流程。round周期超时事件 则是监听每个 BFT 轮次的时间,只要超时就会结束本次 BFT 流程,然后开始一个新的 BFT 流程。区块头提交事件 则是只要矿工收到区块头,就结束本次 BFT,然后开始一个新块的 BFT

func (c *core) handleEvents() {// Clear statedefer func() {c.current = nilc.handlerWg.Done()}()c.handlerWg.Add(1)for {select {case event, ok := <-c.events.Chan():if !ok {return}// A real event arrived, process interesting content// 一个真正人事件到达,处理有趣的内容switch ev := event.Data.(type) {case istanbul.RequestEvent: // 普通拜占庭事件r := &istanbul.Request{ // 区块提案请求事件Proposal: ev.Proposal,}// 处理区块提案请求err := c.handleRequest(r)// 如果是未来的请求则保存if err == errFutureMessage {c.storeRequestMsg(r)}case istanbul.MessageEvent: // 各自消息事件if err := c.handleMsg(ev.Payload); err == nil {// 如果没错误,通过gossip协议发送给其它验证器(不包括自己)c.backend.Gossip(c.valSet, ev.Payload)}case backlogEvent: // 积压消息事件// No need to check signature for internal messages// 没必要检查内部消息的签名,因为消息事件中已检查过了if err := c.handleCheckedMsg(ev.msg, ev.src); err == nil {p, err := ev.msg.Payload()if err != nil {c.logger.Warn("Get message payload failed", "err", err)continue}c.backend.Gossip(c.valSet, p)}}case _, ok := <-c.timeoutSub.Chan():// 拜占庭round周期超时事件if !ok {return}c.handleTimeoutMsg()case event, ok := <-c.finalCommittedSub.Chan(): //区块头提交事件if !ok {return}switch event.Data.(type) {case istanbul.FinalCommittedEvent:c.handleFinalCommitted()}}}
}

后两个比较简单,这里贴一下代码和注释。
round周期超时事件

// round 超时后处理方式
func (c *core) handleTimeoutMsg() {// If we're not waiting for round change yet, we can try to catch up// the max round with F+1 round change message. We only need to catch up// if the max round is larger than current round.// 如果不需要等待round值修改if !c.waitingForRoundChange {//消息数量大于等于 f+1 的round中, 取round值最大的那个round值,maxRound := c.roundChangeSet.MaxRound(c.valSet.F() + 1)if maxRound != nil && maxRound.Cmp(c.current.Round()) > 0 {//修改 round 轮次c.sendRoundChange(maxRound)return}}// 拿到最后一次申请的提案lastProposal, _ := c.backend.LastProposal()// 如果提案不是空的,而且提案不小于当前的区块号,则开始生产新块的轮次if lastProposal != nil && lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {c.logger.Trace("round change timeout, catch up latest sequence", "number", lastProposal.Number().Uint64())// 开始一个全新round轮次c.startNewRound(common.Big0)} else {// 否则,进入下一个round轮次,也就是round+1c.sendNextRoundChange()}
}

区块头提交事件

func (c *core) handleFinalCommitted() error {logger := c.logger.New("state", c.state)logger.Trace("Received a final committed proposal")// 开始一个新的轮次c.startNewRound(common.Big0)return nil
}

然后开始重头戏,BFT的正常流程。

先简单介绍一下BFT的正常流程:

——开始一个新的 round 轮次,round=0sequence=当前区块链高度+1state状态置为StateAcceptRequest,启动 round 的计时器。

——区块提案申请人提交提案。

——收到请求后判断,如果是未来消息,则保存该请求到待处理请求的优先队列中。否则发送 preprepare 消息。

——把当前 View(round sequence) 和请求提案(区块)包装成 preprepare 消息,广播出去,广播前会对消息进行签名处理。

——收到preprepare消息后处理消息,如果验证区块提案时发现这是一个未来区块,则把它放到积压消息的优先队列中,到期处理。如果 区块Hash 被锁定且一致,则发送一个 commit消息。如果是正常流程,则切换 state状态StatePreparepared 并发送 prepare消息

——把 View(round sequence)Hash 包装成 prepare消息,广播出去,广播前会对消息进行签名处理。

——收到 prepare消息 后处理消息,检查 round、sequence 来确保消息的有效性,验证消息签名的正确性,当收到 2f+1 条消息后,锁定 Hash、切换 state 状态为 StatePrepared 并发送 commit消息

——把 View(round sequence)Hash 包装成 commit消息,广播出去,广播前会对消息进行签名处理。

——收到 commit消息 后处理消息,检查 round、sequence 来确保消息的有效性,验证消息签名的正确性,当收到 2f+1 条消息后,锁定 Hash 并提交。

——切换 state 状态为 StateCommitted,然后整理共识数据并插入区块的 额外字段extra 中,如果插入失败,启动当前区块的下一轮 round 共识。

接下来开始详细介绍,处理普通拜占庭事件时,又会监听三种事件:区块提案请求事件、各种消息事件、积压消息事件

启动新的BFT轮次

在说这三种事件之前,先介绍 startNewRound(round *big.Int) 方法,这个方法是启动一个新的 BFT 轮次,如果传入的 round 值是 0,代表要生产一个新的区块。
首先从区块链中获取最高的区块,然后和 sequence 做比较来判断共识的区块是否正确,如果 >sequence,证明当前共识的区块提案已过时,需要共识后面的区块,如果 =sequence-1,证明要共识的区块提案没问题了,然后用 传入 round当前round 比较来判断,如果 传入round=0,则证明当前轮次已开始,不必重复开始当前轮次,直接结束,如果 传入round 大于 当前round,证明需要修改 round 值,则直接修改 round 值,此时 sequence 值不变(如果不需要修改 round 值时,sequence 值为 最高区块号+1round 值为 0,通过最高区块获取验证器)。接下来做一些本次 round 轮次的 准备工作,清空当前轮次无效的信息集全,准备存放收到的消息,更新 current,计算新的申请人。

计算新的申请人的方法,
首先得到一个种子seed,
然后种子seed对验证者数量取模pick,
pick做为验证者集合的偏移量,得到验证者。计算种子的方法有两种,roundRobinProposer和stickyProposer,选择方式是在创世区块配置文件中配置的。
roundRobinProposer:
如果上一个区块的生产者是空,种子seed取round。
否则上一个区块的生产者在集合中的偏移量+round+1
stickyProposer:
如果上一个区块的生产者是空,种子seed取round。
否则上一个区块的生产者在集合中的偏移量+round

如果 round 修改了,而且当前节点是申请人,则发送一个 preprepare消息,如果 Hash 被锁定,则发送当前区块提案,如果当前待处理请求队列中有内容,则发送待处理请求队列中的内容。最后启动一个 roundChange计时器

下面来处理拜占庭监听


func (c *core) handleEvents() {// Clear statedefer func() {c.current = nilc.handlerWg.Done()}()c.handlerWg.Add(1)for {select {case event, ok := <-c.events.Chan():if !ok {return}// A real event arrived, process interesting content// 一个真正人事件到达,处理有趣的内容switch ev := event.Data.(type) {case istanbul.RequestEvent: // 普通拜占庭事件r := &istanbul.Request{ // 区块提案请求事件Proposal: ev.Proposal,}// 处理区块提案请求err := c.handleRequest(r)// 如果是未来的请求则保存if err == errFutureMessage {c.storeRequestMsg(r)}case istanbul.MessageEvent: // 各自消息事件if err := c.handleMsg(ev.Payload); err == nil {// 如果没错误,通过gossip协议发送给其它验证器(不包括自己)c.backend.Gossip(c.valSet, ev.Payload)}case backlogEvent: // 积压消息事件// No need to check signature for internal messages// 没必要检查内部消息的签名,因为消息事件中已检查过了if err := c.handleCheckedMsg(ev.msg, ev.src); err == nil {p, err := ev.msg.Payload()if err != nil {c.logger.Warn("Get message payload failed", "err", err)continue}c.backend.Gossip(c.valSet, p)}}case _, ok := <-c.timeoutSub.Chan():// 拜占庭round周期超时事件if !ok {return}c.handleTimeoutMsg()case event, ok := <-c.finalCommittedSub.Chan(): //区块头提交事件if !ok {return}switch event.Data.(type) {case istanbul.FinalCommittedEvent:c.handleFinalCommitted()}}}
}

istanbul.RequestEvent 普通拜占庭事件

istanbul共识 需要调用拜占庭达成共识时,会是 Seal() 接口中发送区块提案请求给拜占庭。

// post block into Istanbul enginego sb.EventMux().Post(istanbul.RequestEvent{Proposal: block,})

然后拜占庭就收到了 istanbul.RequestEvent 事件,然后处理该请求。

func (c *core) handleRequest(request *istanbul.Request) error {logger := c.logger.New("state", c.state, "seq", c.current.sequence)// 检查消息是无效的?过时的?未来的?if err := c.checkRequestMsg(request); err != nil {if err == errInvalidMessage { logger.Warn("invalid request")return err}logger.Warn("unexpected request", "err", err, "number", request.Proposal.Number(), "hash", request.Proposal.Hash())return err}logger.Trace("handleRequest", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())// 如果检查没问题,则发送一条preprepare消息c.current.pendingRequest = requestif c.state == StateAcceptRequest {c.sendPreprepare(request)}return nil
}

处理请求的第一步是检查消息。

// 检查request状态
func (c *core) checkRequestMsg(request *istanbul.Request) error {if request == nil || request.Proposal == nil {// 如果消息无效return errInvalidMessage}// 检查到哪一步骤了if c := c.current.sequence.Cmp(request.Proposal.Number()); c > 0 {// 如果消息过时return errOldMessage} else if c < 0 {// 如果消息是未来的,这里会把消息保存起来,下文有介绍return errFutureMessage} else {return nil}
}

如果是由于收到未来消息而报错,则需要保存请求的消息到待处理请求的优先队列中。

// 保存请求消息到待处理队列
func (c *core) storeRequestMsg(request *istanbul.Request) {logger := c.logger.New("state", c.state)logger.Trace("Store future request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())c.pendingRequestsMu.Lock()defer c.pendingRequestsMu.Unlock()// 未来的消息保存到优先队列c.pendingRequests.Push(request, float32(-request.Proposal.Number().Int64()))
}

那么什么时候处理这些请求呢?在修改状态 state 的时候,而且是当把 state 修改为 StateAcceptRequest 时。修改 state 时也会处理积压的消息。那如何处理待处理请求的优先队列中的请求呢?

// 处理待处理的请求
func (c *core) processPendingRequests() {c.pendingRequestsMu.Lock()defer c.pendingRequestsMu.Unlock()for !(c.pendingRequests.Empty()) {// 从优先队列中取出请求m, prio := c.pendingRequests.Pop()r, ok := m.(*istanbul.Request)if !ok {c.logger.Warn("Malformed request, skip", "msg", m)continue}// Push back if it's a future message// 查检请求的状态err := c.checkRequestMsg(r)if err != nil {// 如果是未来的请求,还放回待处理队列,然后找下一条if err == errFutureMessage {c.logger.Trace("Stop processing request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash())c.pendingRequests.Push(m, prio)break}c.logger.Trace("Skip the pending request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash(), "err", err)continue}c.logger.Trace("Post pending request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash())// 如果检查没问题,则发送请求事件来处理该请求go c.sendEvent(istanbul.RequestEvent{Proposal: r.Proposal,})}
}

istanbul.MessageEvent 各自消息事件

上文说到发送 preprepare 消息,这个消息就是在这里监听处理的。首先检查消息的签名,然后检查消息的地址,如果都没问题,则分别处理不同的消息。

func (c *core) handleMsg(payload []byte) error {logger := c.logger.New()// Decode message and check its signature// 解码消息并检查它的签名msg := new(message)if err := msg.FromPayload(payload, c.validateFn); err != nil {logger.Error("Failed to decode message from payload", "err", err)return err}// Only accept message if the address is valid// 只接收地址是有效的消息_, src := c.valSet.GetByAddress(msg.Address)if src == nil {logger.Error("Invalid address in message", "msg", msg)return istanbul.ErrUnauthorizedAddress}return c.handleCheckedMsg(msg, src)
}

检查消息签名的方法,需要的参数是消息笔验证的回调函数。

func (m *message) FromPayload(b []byte, validateFn func([]byte, []byte) (common.Address, error)) error {// Decode message// 解码消息err := rlp.DecodeBytes(b, &m)if err != nil {return err}// Validate message (on a message without Signature)if validateFn != nil {var payload []byte// 拿到没有签名的消息对象,即把签名的字段置空payload, err = m.PayloadNoSig()if err != nil {return err}// 调用回调函数来检查消息的签名是否正确_, err = validateFn(payload, m.Signature)}// Still return the message even the err is not nil// 返回的消息不是err就是nilreturn err
}

这里再介绍一下验证消息的回调函数

func CheckValidatorSignature(valSet ValidatorSet, data []byte, sig []byte) (common.Address, error) {// 1. Get signature address// 拿到签名地址signer, err := GetSignatureAddress(data, sig)if err != nil {log.Error("Failed to get signer address", "err", err)return common.Address{}, err}// 2. Check validator// 检查签名者地址是不是申请人的if _, val := valSet.GetByAddress(signer); val != nil {return val.Address(), nil}return common.Address{}, ErrUnauthorizedAddress
}

然后分别处理四种不同的消息,如果处理的消息是未来的消息,则把未来的消息保存到积压消息的优先队列中。

func (c *core) handleCheckedMsg(msg *message, src istanbul.Validator) error {logger := c.logger.New("address", c.address, "from", src)// Store the message if it's a future message// 如果是未来的消息,就保存起来testBacklog := func(err error) error {if err == errFutureMessage {c.storeBacklog(msg, src)}return err}switch msg.Code {case msgPreprepare:return testBacklog(c.handlePreprepare(msg, src))case msgPrepare:return testBacklog(c.handlePrepare(msg, src))case msgCommit:return testBacklog(c.handleCommit(msg, src))case msgRoundChange:return testBacklog(c.handleRoundChange(msg, src))default:logger.Error("Invalid message", "msg", msg)}return errInvalidMessage
}

由于保存未来消息是个单独的模块,这里先讲保存未来的消息,然后依次介绍如何处理其他四种消息。

(1)保存未来的消息

保存未来的消息,简单来说,就是搞一个优先队列,然后分别把不同的消息放进队列中即可。

func (c *core) storeBacklog(msg *message, src istanbul.Validator) {logger := c.logger.New("from", src, "state", c.state)// 不保存自己的消息if src.Address() == c.Address() {logger.Warn("Backlog from self")return}logger.Trace("Store future message")c.backlogsMu.Lock()defer c.backlogsMu.Unlock()// 搞一个优先队列logger.Debug("Retrieving backlog queue", "for", src.Address(), "backlogs_size", len(c.backlogs))backlog := c.backlogs[src.Address()]// 如果通过验证者地址没找到对应的优先队列,则创建一个新的if backlog == nil {backlog = prque.New()}// 消息和消息的优先级保存起来switch msg.Code {case msgPreprepare: // 处理preprepare消息// 解码var p *istanbul.Preprepareerr := msg.Decode(&p)if err == nil {// 放进队列保存backlog.Push(msg, toPriority(msg.Code, p.View))}// for msgRoundChange, msgPrepare and msgCommit casesdefault: // 处理其它三种消息,因为保存消息的数据结构不一样,所以分别处理var p *istanbul.Subjecterr := msg.Decode(&p)if err == nil {backlog.Push(msg, toPriority(msg.Code, p.View))}}c.backlogs[src.Address()] = backlog
}

保存消息到优先级队列中,那优先级是怎么定义的呢?

如果是msgRoundChange消息,则-(sequence * 1000)
如果是其它消息,则 -(sequence x 1000 + round x 10 + 消息的优先级)
消息的优先级:msgPreprepare > msgCommit > msgPrepare,分别是1,2,3

保存后的消息是什么时候处理的呢,就是上文提到的,修改 state 状态的时候,由于保存的消息涵盖了消息的所有类型(四种),所以 state 的任何一次修改,都要处理一下积压的未来消息,那是如何处理的呢?


// 处理队列中积压的消息
func (c *core) processBacklog() {c.backlogsMu.Lock()defer c.backlogsMu.Unlock()// 遍历保存的所有的优先队列for srcAddress, backlog := range c.backlogs {//如果得到的队列是空,慢找下一个队列if backlog == nil {continue}// 如果地址为空,则证明该队列中消息取完了,则删除该队列_, src := c.valSet.GetByAddress(srcAddress)if src == nil {// validator is not availabledelete(c.backlogs, srcAddress)continue}logger := c.logger.New("from", src, "state", c.state)isFuture := false // 搞一个未来消息的标识// We stop processing if//   1. backlog is empty//   2. The first message in queue is a future message// 依次取出队列中的消息,但是不处理空的队列和未来消息for !(backlog.Empty() || isFuture) {m, prio := backlog.Pop() // 取出消息和优先级msg := m.(*message)var view *istanbul.Viewswitch msg.Code { // 根据不同的消息取出不同的视图view,作用是判断消息的有效性case msgPreprepare:var m *istanbul.Preprepareerr := msg.Decode(&m)if err == nil {view = m.View}// for msgRoundChange, msgPrepare and msgCommit casesdefault:var sub *istanbul.Subjecterr := msg.Decode(&sub)if err == nil {view = sub.View}}if view == nil {logger.Debug("Nil view", "msg", msg)continue}// Push back if it's a future message// 检查消息的有效性,判定消息是过去?未来的?// 如果是未来的消息,还放回原处err := c.checkMessage(msg.Code, view)// 如果有错误,取下一条消息if err != nil {if err == errFutureMessage {logger.Trace("Stop processing backlog", "msg", msg)backlog.Push(msg, prio)isFuture = truebreak}logger.Trace("Skip the backlog event", "msg", msg, "err", err)continue}logger.Trace("Post backlog event", "msg", msg)// 如果没问题,则广播积压的消息事件go c.sendEvent(backlogEvent{src: src,msg: msg,})}}
}

(2)处理msgPreprepare消息

这里要处理 preprepare消息,上文的区块提案请求中提到发送 preprepare消息,那是怎么发的呢?这是先说发,后说处理。发送消息说起来也简单,如果当前节点是申请人,并且当前的区块序列和提案的区块序列一致,则把 request 中的区块提案包装成 preprepare消息,然后广播出去。

func (c *core) sendPreprepare(request *istanbul.Request) {logger := c.logger.New("state", c.state)// If I'm the proposer and I have the same sequence with the proposal// 如果当前节点是申请人,并且当前的区块序列和提案的区块序列一致if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.IsProposer() {curView := c.currentView()// 把request对象包装成preprepare消息对象preprepare, err := Encode(&istanbul.Preprepare{View:     curView,Proposal: request.Proposal,})if err != nil {logger.Error("Failed to encode", "view", curView)return}// 把preprepare消息对象广播出去c.broadcast(&message{Code: msgPreprepare,Msg:  preprepare,})}
}

在广播消息的时候,会对消息进行签名、rlp编码 生成最终消息,处理完成后,才会调用广播的方式把消息广播出去,生成最终消息的处理方式如下:

func (c *core) finalizeMessage(msg *message) ([]byte, error) {var err error// Add sender address 消息的地址是当前节点的地址msg.Address = c.Address()// Add proof of consensus 在广播commit消息的时候,填充这个字段,最终会放在区块的额外字段中msg.CommittedSeal = []byte{}// Assign the CommittedSeal if it's a COMMIT message and proposal is not nilif msg.Code == msgCommit && c.current.Proposal() != nil {seal := PrepareCommittedSeal(c.current.Proposal().Hash())msg.CommittedSeal, err = c.backend.Sign(seal)if err != nil {return nil, err}}// Sign message 拿到未签名的消息数据data, err := msg.PayloadNoSig()if err != nil {return nil, err}// 消息签名msg.Signature, err = c.backend.Sign(data)if err != nil {return nil, err}// Convert to payload 对签名后的消息进行rlp编码payload, err := msg.Payload()if err != nil {return nil, err}return payload, nil
}

在这里开始真正地处理 preprepare消息。首先检查消息的有效性,如果消息是旧消息,而且区块提案存在且申请人匹配,则广播一个旧区块的 comit消息。检查消息的申请人是否正确,验证区块是否有问题,如果这是一个未来的 区块,则到期广播该消息,如果是其它错误,则进入下一个共识轮次。如果前面检查没问题,则进入真正的 preprepare消息 处理流程中,如果 当前Hash 没有被锁定,则修改 state 状态并发送 prepare消息,否则判断 锁定的Hash 是否是 当前提案的Hash,如果是,则修改 state状态 并发送 commit消息,如果不是,则进入下一个共识轮次。


func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {logger := c.logger.New("from", src, "state", c.state)// Decode PRE-PREPARE  把preprepare消息解码为preprepare对象var preprepare *istanbul.Preprepareerr := msg.Decode(&preprepare)if err != nil {return errFailedDecodePreprepare}// Ensure we have the same view with the PRE-PREPARE message// If it is old message, see if we need to broadcast COMMIT// 检查消息的有效性,判定消息是过去?未来的?if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil {// 如果收到的是旧新消息if err == errOldMessage {// Get validator set for the given proposal// 获取给定区块的验证器valSet := c.backend.ParentValidators(preprepare.Proposal).Copy()// 获取前一个提案的申请人previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1)// 计算区块提案的申请人valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64())// Broadcast COMMIT if it is an existing block// 1. The proposer needs to be a proposer matches the given (Sequence + Round)// 2. The given block must exist// 如果这个区块已存在,广播一个commit消息// 1.区块提案的申请人必须匹配// 2.这个区块提案必须存在if valSet.IsProposer(src.Address()) && c.backend.HasPropsal(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) {c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal.Hash())return nil}}return err}// Check if the message comes from current proposer// 检查消息是否来自当前申请人if !c.valSet.IsProposer(src.Address()) {logger.Warn("Ignore preprepare messages from non-proposer")return errNotFromProposer}// Verify the proposal we received// 验证我们收到的区块if duration, err := c.backend.Verify(preprepare.Proposal); err != nil {// 区块验证失败logger.Warn("Failed to verify proposal", "err", err, "duration", duration)// if it's a future block, we will handle it again after the duration// 如果这是一个未来的区块,我们将到期处理它,因为消息提前了duration时间if err == consensus.ErrFutureBlock {c.stopFuturePreprepareTimer()//duration 时间后广播该事件c.futurePreprepareTimer = time.AfterFunc(duration, func() {c.sendEvent(backlogEvent{src: src,msg: msg,})})} else {// 如果是其它错误,进入下一个轮次的共识c.sendNextRoundChange()}return err}// Here is about to accept the PRE-PREPARE// 这里真正处理preprepare消息if c.state == StateAcceptRequest {// Send ROUND CHANGE if the locked proposal and the received proposal are different// 如果区块被锁定而且收到的区块是不同的,发送一个改变round的事件进入下一个轮次if c.current.IsHashLocked() {// 如果当前提案正好是锁定的Hash,直接发一个commit消息if preprepare.Proposal.Hash() == c.current.GetLockedHash() {// Broadcast COMMIT and enters Prepared state directly// 设置preprepare消息,状态变成 prepare,广播 commit 消息c.acceptPreprepare(preprepare)c.setState(StatePrepared)c.sendCommit()} else {// Send round change// 进入下一个轮次c.sendNextRoundChange()}} else {// Either//   1. the locked proposal and the received proposal match//   2. we have no locked proposal// 设置preprepare消息,状态变成 preprepare,广播 prepare 消息c.acceptPreprepare(preprepare)c.setState(StatePreprepared)c.sendPrepare()}}return nil
}

**(3)处理msgPrepare消息**

上文说到发送 prepare消息,这是介绍一下如何发送。首先,把消息编码成 prepare消息,然后广播出去,广播方式上文介绍过了,这里不再赘述。

然后开始处理 prepare消息,首先把消息解码成 prepare消息,然后检查区块的有效性,确保消息不是未来的,也不是过去的,接着验证 prepare消息 和当前轮次 区块的Hash 是一致的,之后保存消息到当前轮次。如果 当前Hash 被锁定且等消息中带的 区块Hash,或者收到了 2f及以上 的投票且状态在 StatePrepared 之前,则锁定 Hash,修改状态为 StatePrepared,并发送 commit消息


func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {// Decode PREPARE message// 解码 prepare 消息var prepare *istanbul.Subjecterr := msg.Decode(&prepare)if err != nil {return errFailedDecodePrepare}// 检查 prepare 消息,确保视图相同,轮次和顺序if err := c.checkMessage(msgPrepare, prepare.View); err != nil {return err}// If it is locked, it can only process on the locked block.// Passing verifyPrepare and checkMessage implies it is processing on the locked block since it was verified in the Preprepared state.// 验证 prepare 消息if err := c.verifyPrepare(prepare, src); err != nil {return err}c.acceptPrepare(msg, src)// Change to Prepared state if we've received enough PREPARE messages or it is locked// and we are in earlier state before Prepared state.//如果当前Hash被锁定且等消息中带的区块Hash,或者收到了2f及以上的投票且状态在StatePrepared之前// 如果收到了 2f 的确认票, 且未广播commit消息,修改状态为 prepare,然后广播 commit 消息if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.GetPrepareOrCommitSize() > 2*c.valSet.F()) &&c.state.Cmp(StatePrepared) < 0 {c.current.LockHash()c.setState(StatePrepared)c.sendCommit()}return nil
}

**(4)处理msgCommit消息**

发送 commit消息 就不细说了,但是在广播 commit消息 的时候,需要把 消息的Hash消息的代码Code 组合起来被私钥签名,保存到消息的 CommittedSeal 字段中,以备后续保存到区块的额外字段中。

这里着重介绍如何处理 commit消息。首先把消息解码成 commit消息,然后检查区块的有效性,确保消息不是未来的,也不是过去的,接着验证 commit消息 和当前轮次区块的 Hash 是一致的,之后保存消息到当前轮次。如果收到了 2f及以上 的投票,且状态在 StateCommitted 之前,则锁定 Hash,并提交。

提交的时候,首先修改状态 stateStateCommitted,然后整理共识数据,即把 commit 池中的消息取出来,最终会把这些消息放入区块头和额外字段中,然后更新区块的区块头,通过管道通知矿工共识完成,可以把区块加入区块链并广播区块了。

(5)处理msgRoundChange消息

先说一下什么情况下会发送 roundChange消息,有三种情况:1.round周期超时2.需要开始下一个round轮次3.处理roundChange消息的时候

round周期超时 后,会选择 round轮次 中收到的消息数据不小于 f+1round 中,round 值最大的那个 round 值。需要开始下一个 round 轮次则是 前一个round+1round 值。至于第三种情况,下文详细介绍,这里先介绍如何发送 roundChange消息

首先就是修改 round,而 sequence 不变,然后把该消息编码并广播出去。

// 发送一个给定的 round 来修改 round
func (c *core) sendRoundChange(round *big.Int) {logger := c.logger.New("state", c.state)cv := c.currentView()// 保证新修改的round不得小于原来的roundif cv.Round.Cmp(round) >= 0 {logger.Error("Cannot send out the round change", "current round", cv.Round, "target round", round)return}// 修改roundc.catchUpRound(&istanbul.View{// The round number we'd like to transfer to.Round:    new(big.Int).Set(round),Sequence: new(big.Int).Set(cv.Sequence),})// Now we have the new round number and sequence number// 现在我们拥有新的 round 和 sequencecv = c.currentView()rc := &istanbul.Subject{View:   cv,Digest: common.Hash{},}// 修改round的消息编码payload, err := Encode(rc)if err != nil {logger.Error("Failed to encode ROUND CHANGE", "rc", rc, "err", err)return}// 把修改round的消息广播出去c.broadcast(&message{Code: msgRoundChange,Msg:  payload,})
}

这个过程中更新 round 的方法需要重点介绍一下。首先打开等待 roundChange 修改的开关,然后修改 round,清空修改后的 round 中的内容,启动一个新的 roundChange计时器

func (c *core) catchUpRound(view *istanbul.View) {logger := c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence(), "old_proposer", c.valSet.GetProposer())if view.Round.Cmp(c.current.Round()) > 0 {c.roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64())}c.waitingForRoundChange = true// Need to keep block locked for round catching up// 修改roundc.updateRoundState(view, c.valSet, true)c.roundChangeSet.Clear(view.Round) // 清空当前round中的内容c.newRoundChangeTimer() // 搞一个新的计时器logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet)
}

修改round的本质就是创建一个新的 roundState 放在 current属性 中。

从这里开始处理 roundChange消息,解码和检查消息的有效性不说了,然后 把消息添加到 roundChange消息 集中并返回这个区块的这个轮次中共有多少消息,如果数量等于 f+1,继续发 roundChange消息,如果 数量等于 2f+1,开始一个新的round轮次。


func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {logger := c.logger.New("state", c.state, "from", src.Address().Hex())// Decode ROUND CHANGE message// 解码 修改round 消息var rc *istanbul.Subjectif err := msg.Decode(&rc); err != nil {logger.Error("Failed to decode ROUND CHANGE", "err", err)return errInvalidMessage}if err := c.checkMessage(msgRoundChange, rc.View); err != nil {return err}cv := c.currentView()roundView := rc.View// Add the ROUND CHANGE message to its message set and return how many// messages we've got with the same round number and sequence number.// 把消息添加到roundChange消息集中并返回这个区块的这个轮次中共有多少消息num, err := c.roundChangeSet.Add(roundView.Round, msg)if err != nil {logger.Warn("Failed to add round change message", "from", src, "msg", msg, "err", err)return err}// Once we received f+1 ROUND CHANGE messages, those messages form a weak certificate.// If our round number is smaller than the certificate's round number, we would// try to catch up the round number.// 一旦收到 f+1 条  修改消息,这些消息组成一个弱证书// 如果我们的 round 号小于 证书的 round 号, 我们将试着加到round号// 如果需要等待roundChange修改,且收到的消息数量等于 f+1if c.waitingForRoundChange && num == int(c.valSet.F()+1) {// 如果当前轮次的round小于收到的消息的roundif cv.Round.Cmp(roundView.Round) < 0 {// 继续发送roundChange消息c.sendRoundChange(roundView.Round)}return nil} else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(roundView.Round) < 0) {// We've received 2f+1 ROUND CHANGE messages, start a new round immediately.// 收到 2f+1 条修改消息,立即开始一个新的round// 如果收到的消息数量等于 2f+1, 且 需要等待roundChange修改或者当前轮次的round小于收到的消息的round,则开始一个新的round轮次c.startNewRound(roundView.Round)return nil} else if cv.Round.Cmp(roundView.Round) < 0 {// Only gossip the message with current round to other validators.return errIgnored}return nil
}

backlogEvent 积压消息事件

这块内容是处理积压的消息,由于这些消息来得早了,先保存起来,到期触发本事件执行,本文不详细介绍这块内容了,因为它和上文 istanbul.MessageEvent 各自消息事件 的处理方式基本一样,唯一的差别就是,没有检查这些消息的签名,因为在保存消息前已经检查过了,所以消息的签名是一定没问题的。

quorum中的BFT相关推荐

  1. HotStuff: BFT Consensus in the Lens of Blockchain

    Facebook 近日公布的 Libra 白皮书引起各界持续关注,其网站公开的技术文档也被诸多专家审视,文档提到Libra 区块链将使用基于拜占庭容错共识的「LibraBFT」共识算法,而 Libra ...

  2. 【转】BFT类共识协议概览与分析实测

    摘要 本文首先对BFT类共识协议按照改进思路分为3大类进行综述性概览: 针对无拜占庭错误场景优化的协议,包括PBFT.Zyzzyva等等 针对拜占庭错误场景优化的,包括Aardvark.Primer等 ...

  3. 【论文阅读】Foundations of Dynamic BFT --- IEEE SP ‘22

    文章目录 摘要 1 介绍 2 相关工作 3 系统和威胁模型 A. 静态 BFT B. 动态 BFT C. 动态 BFT 的组成员资格更改 D. 假设 E. 属性规范 F 定义的变体 G. 与先前规范的 ...

  4. hbase中的region

    HBase 基本概念 1.region region 是部分数据,所以是所有数据的一个自己,但region包括完整的行,所以region 是行为单位 表的一个子集. 每个region 有三个主要要素: ...

  5. GNU Make 使用手册(于凤昌中译版)

    GNU Make 使用手册(中译版) 翻译:于凤昌 GNU make Version 3.79 April 2000 Richard M. Stallman and Roland McGrath 1 ...

  6. 联盟链战国:五大巨头横向对比

    联盟链是目前区块链落地实践的热点,也是大家对"杀手级应用"期望最大的区块链部署形态.联盟链的诞生源于对区块链技术的"反思",是对比特币.以太坊所体现的技术特点与 ...

  7. 分布式系统一致性的发展历史 (二)

    2019独角兽企业重金招聘Python工程师标准>>> 在本系列第一篇文章中我们提到了Lamport Clock如何启发人们在分布式系统中开始使用新的的思维方式, 并介绍了Seque ...

  8. 区块链在中国(2):PBFT算法

    上一张我们从分布式系统的角度简单叙述了一下 IBM HyperLedger fabric 的一些基本概念.架构和协议信息.其中最为核心的部分就是共识算法(consensus plugin),fabri ...

  9. Practical Byzantine Fault Tolerance and Proactive Recovery

    Practical Byzantine Fault Tolerance and Proactive Recovery 我们对互联网上可访问的在线服务日益增长的依赖性需要高可用性的无需中断即可提供正确服 ...

最新文章

  1. idea自动生成方法注释(含参数及返回值)
  2. Pandas简明教程:九、表的合并、连接、拼接(数据聚合基础)
  3. 【译】Android中的安全数据-初始化向量
  4. 获取Authorize.Net Transaction Key ( Getting Your Authorize.Net Transaction Key )
  5. 诗与远方:无题(七十七)- 远方
  6. Linux修改后保存与不保存,强制退出vi与vi下查找命令关键步骤!
  7. 第十三节:HttpHander扩展及应用(自定义扩展名、图片防盗链)
  8. UE4之脚本导入fbx
  9. Wpf之MVVM线程问题
  10. 中文文本对齐_终于明白Word如何快速对齐姓名!为之前狂敲空格的我,留下一把泪...
  11. nginx 服务器重启命令,关闭(转)
  12. Layui中文离线版文档
  13. 周立功CAN通讯(txt格式) 报文解析
  14. 上海一本计算机工程应用大学排名,2019上海软科世界一流学科排名计算机科学与工程专业排名密歇根州立大学排名第51-75...
  15. vue之设置背景图片自适应屏幕
  16. 有一只会射子弹的贪食蛇,你见过吗?
  17. 在word中写出打勾的方框
  18. Python验证“哥德巴赫猜想”
  19. GSM系统信令接续流程
  20. chrome修改摄像头权限_如何在Chrome中更改网站的摄像头和麦克风权限

热门文章

  1. IP协议协议--IP分片
  2. 为什么要多用组合少用继承?
  3. 从微信的成功看移动AppUI设计的精髓
  4. 实验四 数据查询——简单查询 Sql Server数据库实验
  5. mysql求2个日期之间的工作日数(周一到周五)
  6. 概率(3)一根木棍折断成3段构成一个三角形的概率
  7. Python字符串函数的使用
  8. 公钥密码的三大数学问题
  9. android 9.0 Launcher3去掉默认的google搜索栏
  10. 记——通过点击表头弹出筛选选项列表,点击进行数据筛选