日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
一篇文章帶給你Etcd-Raft學(xué)習(xí)

從本質(zhì)上說,Raft 算法是通過一切以領(lǐng)導(dǎo)者為準(zhǔn)的方式,實現(xiàn)一系列值的共識和各節(jié)點日志的一致

創(chuàng)新互聯(lián)建站長期為超過千家客戶提供的網(wǎng)站建設(shè)服務(wù),團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為立山企業(yè)提供專業(yè)的做網(wǎng)站、成都做網(wǎng)站,立山網(wǎng)站改版等技術(shù)服務(wù)。擁有10余年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。

  • Leader 選舉,Leader 故障后集群能快速選出新 Leader;
  • 日志復(fù)制, 集群只有 Leader 能寫入日志, Leader 負責(zé)復(fù)制日志到 Follower 節(jié)點,并強制 Follower 節(jié)點與自己保持相同;
  • 安全性,成員變更,一個任期內(nèi)集群只能產(chǎn)生一個 Leader、已提交的日志條目在發(fā)生 Leader 選舉時,一定會存在更高任期的新 Leader 日志中、各個節(jié)點的狀態(tài)機應(yīng)用的任意位置的日志條目內(nèi)容應(yīng)一樣等。

Leader 選舉

raft 算法本質(zhì)上是一個大的狀態(tài)機,任何的操作例如選舉、提交數(shù)據(jù)等,最后都被封裝成一個消息結(jié)構(gòu)體,輸入到 raft 算法庫的狀態(tài)機中。raft 算法其實由好幾個協(xié)議組成,etcd-raft 將其統(tǒng)一定義在了 Message 結(jié)構(gòu)體之中,以下總結(jié)了該結(jié)構(gòu)體的成員用途:

 
 
 
 
  1. type Message struct {
  2. Type             MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"` // 消息類型
  3. To               uint64      `protobuf:"varint,2,opt,name=to" json:"to"` // 消息接收者的節(jié)點ID
  4. From             uint64      `protobuf:"varint,3,opt,name=from" json:"from"` // 消息發(fā)送者的節(jié)點 ID
  5. Term             uint64      `protobuf:"varint,4,opt,name=term" json:"term"` // 發(fā)送消息的節(jié)點的Term值。如果Term值為0,則為本地消息,在etcd-raft模塊的實現(xiàn)中,對本地消息進行特殊處理。
  6. LogTerm          uint64      `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"` // 該消息攜帶的第一條Entry記錄的Term值,日志所處的任期ID
  7. Index            uint64      `protobuf:"varint,6,opt,name=index" json:"index"` // 日志索引ID,用于節(jié)點向 Leader 匯報自己已經(jīng)commit的日志數(shù)據(jù)ID
  8. Entries          []Entry     `protobuf:"bytes,7,rep,name=entries" json:"entries"` // 如果是MsgApp類型的消息,則該字段中保存了Leader節(jié)點復(fù)制到Follower節(jié)點的Entry記錄
  9. Commit           uint64      `protobuf:"varint,8,opt,name=commit" json:"commit"` // 消息發(fā)送節(jié)點提交日志索引
  10. Snapshot         Snapshot    `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"` // 在傳輸快照時,該字段保存了快照數(shù)據(jù)
  11. Reject           bool        `protobuf:"varint,10,opt,name=reject" json:"reject"` // 主要用于響應(yīng)類型的消息,表示是否拒絕收到的消息
  12. RejectHint       uint64      `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"` //在Follower節(jié)點拒絕Leader節(jié)點的消息之后,會在該字段記錄一個Entry索引值供Leader節(jié)點
  13. Context          []byte      `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"` // 消息攜帶的一些上下文信息。例如,該消息是否與Leader節(jié)點轉(zhuǎn)移相關(guān)
  14. XXX_unrecognized []byte      `json:"-"`
  15. }

Message結(jié)構(gòu)體相關(guān)的數(shù)據(jù)類型為 MessageType,MessageType 有 19 種。當(dāng)然,并不是所有的消息類型都會用到上面定義的Message結(jié)構(gòu)體中的所有字段,因此其中有些字段是Optinal的。

 
 
 
 
  1.    MsgHup            MessageType = 0  //當(dāng)Follower節(jié)點的選舉計時器超時,會發(fā)送MsgHup消息
  2. MsgBeat           MessageType = 1  //Leader發(fā)送心跳,主要作用是探活,F(xiàn)ollower接收到MsgBeat會重置選舉計時器,防止Follower發(fā)起新一輪選舉
  3. MsgProp           MessageType = 2  //客戶端發(fā)往到集群的寫請求是通過MsgProp消息表示的
  4. MsgApp            MessageType = 3  //當(dāng)一個節(jié)點通過選舉成為Leader時,會發(fā)送MsgApp消息
  5. MsgAppResp        MessageType = 4  //MsgApp的響應(yīng)消息
  6. MsgVote           MessageType = 5  //當(dāng)PreCandidate狀態(tài)節(jié)點收到半數(shù)以上的投票之后,會發(fā)起新一輪的選舉,即向集群中的其他節(jié)點發(fā)送MsgVote消息
  7. MsgVoteResp       MessageType = 6  //MsgVote選舉消息響應(yīng)的消息
  8. MsgSnap           MessageType = 7  //Leader向Follower發(fā)送快照信息
  9. MsgHeartbeat      MessageType = 8  //Leader發(fā)送的心跳消息
  10. MsgHeartbeatResp  MessageType = 9  //Follower處理心跳回復(fù)返回的消息類型
  11. MsgUnreachable    MessageType = 10 //Follower消息不可達
  12. MsgSnapStatus     MessageType = 11 //如果Leader發(fā)送MsgSnap消息時出現(xiàn)異常,則會調(diào)用Raft接口發(fā)送MsgUnreachable和MsgSnapStatus消息
  13. MsgCheckQuorum    MessageType = 12 //Leader檢測是否保持半數(shù)以上的連接
  14. MsgTransferLeader MessageType = 13 //Leader節(jié)點轉(zhuǎn)移時使用,本地消息
  15. MsgTimeoutNow     MessageType = 14 //Leader節(jié)點轉(zhuǎn)移超時,會發(fā)該類型的消息,使Follower的選舉計時器立即過期,并發(fā)起新一輪的選舉
  16. MsgReadIndex      MessageType = 15 //客戶端發(fā)往集群的只讀消息使用MsgReadIndex消息(只讀的兩種模式:ReadOnlySafe和ReadOnlyLeaseBased)
  17. MsgReadIndexResp  MessageType = 16 //MsgReadIndex消息的響應(yīng)消息
  18. MsgPreVote        MessageType = 17 //PreCandidate狀態(tài)下的節(jié)點發(fā)送的消息
  19. MsgPreVoteResp    MessageType = 18 //預(yù)選節(jié)點收到的響應(yīng)消息  

然后是 raft 算法的實現(xiàn),node 結(jié)構(gòu)體實現(xiàn)了 Node 接口,對etcd-raft模塊具體實現(xiàn)的一層封裝,方便上層模塊使用etcd-raft模塊。其定義如下:

 
 
 
 
  1. type node struct {
  2. propc      chan msgWithResult      //該通道用于接收MsgProp類型的消息
  3. recvc      chan pb.Message         //除MsgProp外的其他類型的消息都是由該通道接收的
  4. confc      chan pb.ConfChangeV2    //當(dāng)節(jié)點收到EntryConfChange類型的Entry記錄時,會轉(zhuǎn)換成ConfChange,并寫入該通道中等待處理。在ConfChange中封裝了其唯一 ID、待處理的節(jié)點 ID (NodeID 字段)及處理類型(Type 字段,例如,ConfChangeAddNode類型表示添加節(jié)點)等信息
  5. confstatec chan pb.ConfState       //在ConfState中封裝了當(dāng)前集群中所有節(jié)點的ID,該通道用于向上層模塊返回ConfState實例
  6. readyc     chan Ready              //Ready結(jié)構(gòu)體的功能在上一小節(jié)已經(jīng)介紹過了,該通道用于向上層模塊返回Ready實例,即node.Ready()方法的返回值
  7. advancec   chan struct{}           //當(dāng)上層模塊處理完通過上述readyc通道獲取到的Ready實例之后,會通過node.Advance()方法向該通道寫入信號,從而通知底層raft實例
  8. tickc      chan struct{}                //用來接收邏輯時鐘發(fā)出的信號,之后會根據(jù)當(dāng)前節(jié)點的角色推進選舉計時器和心跳計時器
  9. done       chan struct{}           //當(dāng)檢測到done通道關(guān)閉后,在其上阻塞的goroutine會繼續(xù)執(zhí)行,并進行相應(yīng)的關(guān)閉操作
  10. stop       chan struct{}           //當(dāng)node.Stop()方法被調(diào)用時,會向該通道發(fā)送信號,在后續(xù)介紹中會提到,有另一個goroutine會嘗試讀取該通道中的內(nèi)容,當(dāng)讀取到信息之后,會關(guān)閉done通道。
  11. status     chan chan Status        //注意該通道的類型,其中傳遞的元素也是Channel類型,即node.Status()方法的返回值
  12.  rn        *RawNode
  13. }

下面我們來看看 raft StateMachine 的狀態(tài)機轉(zhuǎn)換,實際上就是 raft 算法中各種角色的轉(zhuǎn)換。每個 raft 節(jié)點,可能具有以下三種狀態(tài)中的一種。

  • Candidate:候選人狀態(tài),該狀態(tài)意味著將進行一次新的選舉。
  • Follower:跟隨者狀態(tài),該狀態(tài)意味著選舉結(jié)束。
  • Leader:領(lǐng)導(dǎo)者狀態(tài),選舉出來的節(jié)點,所有數(shù)據(jù)提交都必須先提交到 Leader 上。

每一個狀態(tài)都有其對應(yīng)的狀態(tài)機,每次收到一條提交的數(shù)據(jù)時,都會根據(jù)其不同的狀態(tài)將消息輸入到不同狀態(tài)的狀態(tài)機中。同時,在進行 tick 操作時,每種狀態(tài)對應(yīng)的處理函數(shù)也是不一樣的。因此 raft 結(jié)構(gòu)體中將不同的狀態(tài)及其不同的處理函數(shù),獨立出來幾個成員變量:

  • state,保存當(dāng)前節(jié)點狀態(tài);
  • tick 函數(shù),每個狀態(tài)對應(yīng)的 tick 函數(shù)不同;
  • step,狀態(tài)機函數(shù),同樣每個狀態(tài)對應(yīng)的狀態(tài)機也不相同

我們接著看 etcd-raft 狀態(tài)轉(zhuǎn)換。etcd-raft StateMachine 封裝在 raft機構(gòu)體中,etcd為了不讓entry落后的太多的直接進行選舉,多了一個其PreCandidate狀態(tài),轉(zhuǎn)換如下圖:

raft 狀態(tài)轉(zhuǎn)換的接口都在 raft.go 中,其定義如下:

 
 
 
 
  1. //在newRaft()函數(shù)中完成初始化之后,會調(diào)用 becomeFollower()方法將節(jié)點切換成 Follower狀態(tài),其中會設(shè)置raft實例的多個字段
  2. func (r *raft) becomeFollower(term uint64, lead uint64) {
  3.  r.step = stepFollower //設(shè)置函數(shù)處理Follower節(jié)點處理消息的行為
  4.  r.reset(term) //在reset()方法中會重置raft實例的多個字段
  5.  r.tick = r.tickElection //將tick字段設(shè)置成tickElection函數(shù)
  6.  r.lead = lead //設(shè)置當(dāng)前節(jié)點的leader節(jié)點
  7.     //修改當(dāng)前節(jié)點的角色
  8.  r.state = StateFollower
  9. }
  10. //如果當(dāng)前集群開啟了 PreVote 模式,當(dāng) Follower 節(jié)點的選舉計時器超時時,會先調(diào)用becomePreCandidate()方法切換到PreCandidate狀態(tài),becomePreCandidate()
  11. func (r *raft) becomePreCandidate() {
  12.     //檢查當(dāng)前節(jié)點的狀態(tài),禁止leader直接切換到PreCandidate狀態(tài)
  13.  if r.state == StateLeader {
  14.   panic("invalid transition [leader -> pre-candidate]")
  15.  }
  16.     //設(shè)置函數(shù)處理Candidate節(jié)點處理消息的行為
  17.  r.step = stepCandidate 
  18.  r.prs.ResetVotes()
  19.  r.tick = r.tickElection
  20.  r.lead = None
  21.     //修改當(dāng)前節(jié)點的角色
  22.  r.state = StatePreCandidate 
  23. }
  24. //當(dāng)節(jié)點可以連接到集群中半數(shù)以上的節(jié)點時,會調(diào)用 becomeCandidate()方法切換到Candidate狀態(tài),becomeCandidate()
  25. func (r *raft) becomeCandidate() {
  26.  // TODO(xiangli) remove the panic when the raft implementation is stable
  27.  if r.state == StateLeader {
  28.   panic("invalid transition [leader -> candidate]")
  29.  }
  30.     //在reset()方法中會重置raft實例的多個字段
  31.  r.step = stepCandidate
  32.  r.reset(r.Term + 1) //在reset()方法中會重置raft實例的多個字段
  33.  r.tick = r.tickElection
  34.  r.Vote = r.id //在此次的選舉中,Candidate節(jié)點會將選票投給自己
  35.     //修改當(dāng)前節(jié)點的角色
  36.  r.state = StateCandidate
  37. }
  38. //當(dāng) Candidate 節(jié)點得到集群中半數(shù)以上節(jié)點的選票時,會調(diào)用 becomeLeader()方法切換成Leader狀態(tài),becomeLeader()
  39. func (r *raft) becomeLeader() {
  40.     //檢查當(dāng)前節(jié)點的狀態(tài),機制從follower直接切換成leader狀態(tài)
  41.  if r.state == StateFollower {
  42.   panic("invalid transition [follower -> leader]")
  43.  }
  44.  r.step = stepLeader
  45.  r.reset(r.Term) //在reset()方法中會重置raft實例的多個字段
  46.  r.tick = r.tickHeartbeat
  47.  r.lead = r.id //將leader字段設(shè)置成當(dāng)前節(jié)點的id
  48.  r.state = StateLeader //更新當(dāng)前節(jié)點的角色
  49.     //檢查未提交的記錄中是否存在多條集群配置變更的Entry記錄
  50.  r.prs.Progress[r.id].BecomeReplicate()
  51.  r.pendingConfIndex = r.raftLog.lastIndex()
  52.  emptyEnt := pb.Entry{Data: nil}
  53.     //向當(dāng)前節(jié)點的raftLog中追加一條空的Entry記錄
  54.  if !r.appendEntry(emptyEnt) {
  55.     }
  56.  r.reduceUncommittedSize([]pb.Entry{emptyEnt})
  57. }

tick 函數(shù),每個狀態(tài)對應(yīng)的 tick 函數(shù)不同,下面分析兩個tick:

 
 
 
 
  1. func (r *raft) tickElection() {
  2.  r.electionElapsed++ //遞增electionElapsed計時器
  3.  if r.promotable() && r.pastElectionTimeout() { //檢查是否在集群中與檢查單簽的選舉計時器是否超時
  4.   r.electionElapsed = 0
  5.   r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) //發(fā)起step處理pb.MsgHup類型消息。
  6.  }
  7. }
  8. func (r *raft) tickHeartbeat() {
  9.  r.heartbeatElapsed++ //遞增heartbeatElapsed計時器
  10.  r.electionElapsed++ //遞增electionElapsed計時器
  11.  if r.electionElapsed >= r.electionTimeout {
  12.   r.electionElapsed = 0 //重置選舉計時器,leader節(jié)點不會主動發(fā)起選舉
  13.   if r.checkQuorum { //進行多數(shù)檢查
  14.    r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) //發(fā)起大多數(shù)檢查。
  15.   }
  16.         //選舉計時器處于electionElapsed~randomizedElectionTimeout時段之間時,不能進行l(wèi)eader轉(zhuǎn)移
  17.   if r.state == StateLeader && r.leadTransferee != None {
  18.    r.abortLeaderTransfer() //清空raft.leadTransferee字段,放棄轉(zhuǎn)移
  19.   }
  20.  }
  21.  if r.state != StateLeader { //只有l(wèi)aeder能發(fā)送tickHeartbeat
  22.   return
  23.  }
  24.  if r.heartbeatElapsed >= r.heartbeatTimeout { //心跳計時器超時
  25.   r.heartbeatElapsed = 0 //重置心跳計時器
  26.   r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) //發(fā)起step處理MsgBeat類型消息
  27.  }
  28. }

跟隨者、預(yù)選候選人、候選人、領(lǐng)導(dǎo)者 4 種節(jié)點狀態(tài)都有分別對應(yīng)的功能函數(shù),當(dāng)需要查看各節(jié)點狀態(tài)相關(guān)的功能實現(xiàn)時(比如,跟隨者如何接收和處理日志),都可以將對應(yīng)的函數(shù)作為入口函數(shù),來閱讀代碼和研究功能實現(xiàn)。

日志復(fù)制

這里重點看一下raft.appendEntry()方法,它的主要操作步驟如下:(1)設(shè)置待追加的Entry記錄的Term值和Index值。

(2)向當(dāng)前節(jié)點的raftLog中追加Entry記錄。

(3)更新當(dāng)前節(jié)點對應(yīng)的Progress實例。

(4)嘗試提交Entry記錄,即修改raftLog.committed字段的值。

raft.appendEntry()方法的具體實現(xiàn)如下:

 
 
 
 
  1. func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
  2.  li := r.raftLog.lastIndex()//獲取raftLog中最后一條記錄的索引值
  3.  for i := range es {//更新待追加記錄的Term值和索引值
  4.   es[i].Term = r.Term//Entry記錄的Term指定為當(dāng)前l(fā)eader節(jié)點的任期號
  5.   es[i].Index = li + 1 + uint64(i) //為日志記錄指定的Index
  6.  }
  7.  li = r.raftLog.append(es...)//向raft中追加記錄
  8.     //更新當(dāng)前節(jié)點對應(yīng)的Progress,主要是更新Next和Match
  9.  r.prs.Progress[r.id].MaybeUpdate(li)
  10.     //嘗試提交Entry記錄
  11.  r.maybeCommit()
  12.  return true
  13. }

在Progress.mayUpdate()方法中,會嘗試修改Match字段和Next字段,用來標(biāo)識對應(yīng)節(jié)點Entry記錄復(fù)制的情況。Leader節(jié)點除了在向自身raftLog中追加記錄時(即appendEntry()方法)會調(diào)用該方法,當(dāng)Leader節(jié)點收到Follower節(jié)點的MsgAppResp消息(即MsgApp消息的響應(yīng)消息)時,也會調(diào)用該方法嘗試修改Follower節(jié)點對應(yīng)的Progress實例。Progress.MayUpdate()方法的具體實現(xiàn)如下:

 
 
 
 
  1. func (pr *Progress) MaybeUpdate(n uint64) bool {
  2.  var updated bool
  3.  if pr.Match < n {
  4.   pr.Match = n //n之前所有的Entry記錄都已經(jīng)寫入對應(yīng)節(jié)點的raftLog中
  5.   updated = true
  6.         //下面將Progress.paused設(shè)置為false,表示leader節(jié)點可以繼續(xù)向?qū)?yīng)Follower
  7.         //節(jié)點發(fā)送MsgApp消息
  8.   pr.ProbeAcked()
  9.  }
  10.  pr.Next = max(pr.Next, n+1)//將Next值加一,下一次復(fù)制Entry記錄開始的位置
  11.  return updated
  12. }

如果該Entry記錄已經(jīng)復(fù)制到了半數(shù)以上的節(jié)點中,則在raft.maybeCommit()方法中會嘗試將其提交。除了 appendEntry()方法,在 Leader 節(jié)點每次收到 MsgAppResp 消息時也會調(diào)用maybeCommit()方法,maybeCommit()方法的具體實現(xiàn)如下:

 
 
 
 
  1. func (r *raft) maybeCommit() bool {
  2.  mci := r.prs.Committed()
  3.  return r.raftLog.maybeCommit(mci, r.Term)
  4. }
  5. func (p *ProgressTracker) Committed() uint64 {
  6.  return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress)))
  7. }
  8. //將node分兩個組,JointConfig是大多數(shù)的組,有興趣的看一看quorum包的實現(xiàn)
  9. func (c JointConfig) CommittedIndex(l AckedIndexer) Index {//比較大多數(shù)的node的前倆個Index,返回Match的值。
  10.  idx0 := c[0].CommittedIndex(l)
  11.  idx1 := c[1].CommittedIndex(l)
  12.  if idx0 < idx1 {
  13.   return idx0
  14.  }
  15.  return idx1
  16. }
  17. //更新raftLog.committed字段,完成提交
  18. func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
  19.  if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
  20.   l.commitTo(maxIndex)
  21.   return true
  22.  }
  23.  return false
  24. }

etcd 將 raft 相關(guān)的所有處理都抽象為了 Message,通過 Step 接口處理各類消息的入口,首先根據(jù)Term"值"對消息進行分類處理,再根據(jù)消息的"類型"進行分類處理:

 
 
 
 
  1. func (r *raft) Step(m pb.Message) error {
  2.  switch {//首先根據(jù)消息的Term值進行分類處理
  3.  case m.Term == 0://本地消息不做處理。MsgHup,MsgProp和MsgReadIndex是本地消息
  4.  case m.Term > r.Term:
  5.  case m.Term < r.Term://細節(jié)部分,可以自己研究源碼
  6.  }
  7.  switch m.Type {//根據(jù)Message的Type進行分類處理
  8.  case pb.MsgHup://這里針對MsgHup類型的消息進行處理。
  9.   if r.preVote {//檢查是不是開啟了preVote,如果是開啟了先調(diào)用raft.hup方法,發(fā)起preVote。
  10.   } else {
  11.    r.hup(campaignElection)//下面講述
  12.   }
  13.  case pb.MsgVote, pb.MsgPreVote: //對MsgVote,MsgPreVote類型的消息進行處理。
  14.   canVote := r.Vote == m.From ||
  15.    (r.Vote == None && r.lead == None) ||
  16.    (m.Type == pb.MsgPreVote && m.Term > r.Term)
  17.   if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  18.    r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
  19.    if m.Type == pb.MsgVote {
  20.     r.electionElapsed = 0
  21.     r.Vote = m.From
  22.    }
  23.   } else {
  24.    r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
  25.   }
  26.  default://對于其他類型的消息處理,對應(yīng)的node的step函數(shù)處理
  27.   err := r.step(r, m)
  28.   if err != nil {
  29.    return err
  30.   }
  31.  }
  32.  return nil
  33. }

這里主要使用hup函數(shù)對Message來做處理,在raft.campaign()方法中,除了完成狀態(tài)切換,還會向集群中的其他節(jié)點發(fā)送相應(yīng)類型的消息,例如,如果當(dāng)前 Follower 節(jié)點要切換成 PreCandidate 狀態(tài),則會發(fā)送 MsgPreVote 消息:

 
 
 
 
  1. func (r *raft) hup(t CampaignType) {
  2.  if r.state == StateLeader {//忽略leader
  3.   return
  4.  }
  5.     //方法會檢查prs字段中是否還存在當(dāng)前節(jié)點對應(yīng)的Progress實例,這是為了監(jiān)測當(dāng)前節(jié)點是否被從集群中移除了
  6.     if !r.promotable() {
  7.   return
  8.  }
  9.     //獲取raftLog中已提交但未應(yīng)用的Entry記錄,異常處理
  10.  ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
  11.  r.campaign(t)
  12. }
  13. func (r *raft) campaign(t CampaignType) {
  14.     //該方法的會發(fā)送一條包含Term值和類型
  15.  var term uint64
  16.  var voteMsg pb.MessageType
  17.  if t == campaignPreElection {//切換的目標(biāo)狀態(tài)是Precandidate
  18.   r.becomePreCandidate()
  19.   voteMsg = pb.MsgPreVote
  20.         //確定要發(fā)送的Term值,這里只是增加了消息的Term值,并未增加raft.term字段的值
  21.   term = r.Term + 1
  22.  } else {//切換的目標(biāo)狀態(tài)是Candidate
  23.   r.becomeCandidate()
  24.   voteMsg = pb.MsgVote
  25.         //給raft.Term字段的值,并將當(dāng)前節(jié)點的選票投給自身
  26.   term = r.Term
  27.  }
  28.  if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
  29.         //當(dāng)?shù)玫阶銐虻倪x票時,則將PreCandidate狀態(tài)的節(jié)點切換成Candidate狀態(tài)
  30.         //Candidate狀態(tài)的節(jié)點則切換成Leader狀態(tài)
  31.   if t == campaignPreElection {
  32.    r.campaign(campaignElection)
  33.   } else {
  34.    r.becomeLeader()
  35.   }
  36.   return
  37.  }
  38.  var ids []uint64
  39.  {
  40.   idMap := r.prs.Voters.IDs()
  41.   ids = make([]uint64, 0, len(idMap))
  42.   for id := range idMap {
  43.    ids = append(ids, id)
  44.   }
  45.   sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
  46.  }
  47.  for _, id := range ids {//狀態(tài)切換完成之后,當(dāng)前節(jié)點會向集群中所有節(jié)點發(fā)送指定類型的消息
  48.   if id == r.id { //跳過當(dāng)前節(jié)點自身
  49.    continue
  50.   }
  51.         var ctx []byte
  52.         //在進行Leader節(jié)點轉(zhuǎn)移時,MsgPreVote或MsgVote消息會在Context字段中設(shè)置該特殊值
  53.   if t == campaignTransfer {
  54.    ctx = []byte(t)
  55.   }
  56.         //發(fā)送指定類型的消息,其中Index和LogTerm分別是當(dāng)前節(jié)點的raftLog
  57.         //最后一條消息的Index值和Term值
  58.   r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
  59.  }
  60. }

Follower 節(jié)點在選舉計時器超時的行為:首先它會通過 tickElection()創(chuàng)建MsgHup消息并將其交給raft.Step()方法進行處理;raft.Step()方法會將當(dāng)前Follower節(jié)點切換成PreCandidate狀態(tài),然后創(chuàng)建MsgPreVote類型的消息,最后將該消息追加到raft.msgs字段中,等待上層模塊將其發(fā)送出去。

本文轉(zhuǎn)載自微信公眾號「運維開發(fā)故事」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系運維開發(fā)故事公眾號。


網(wǎng)頁標(biāo)題:一篇文章帶給你Etcd-Raft學(xué)習(xí)
分享URL:http://www.5511xx.com/article/cohhgci.html