日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線(xiàn)溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
Go語(yǔ)言如何操縱Kafka保證無(wú)消息丟失

go語(yǔ)言如何操縱Kafka保證無(wú)消息丟失

作者: AsongGo 2021-09-13 07:23:53

開(kāi)發(fā)

后端

Kafka Kafka是由Apache軟件基金會(huì)開(kāi)發(fā)的一個(gè)開(kāi)源流處理平臺(tái),由Scala和Java編寫(xiě)。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一、高吞吐、低延遲的平臺(tái)。其持久化層本質(zhì)上是一個(gè)“按照分布式事務(wù)日志架構(gòu)的大規(guī)模發(fā)布/訂閱消息隊(duì)列”,這使它作為企業(yè)級(jí)基礎(chǔ)設(shè)施來(lái)處理流式數(shù)據(jù)非常有價(jià)值。

創(chuàng)新互聯(lián)建站專(zhuān)注于南芬企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站設(shè)計(jì),成都做商城網(wǎng)站。南芬網(wǎng)站建設(shè)公司,為南芬等地區(qū)提供建站服務(wù)。全流程按需定制,專(zhuān)業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)建站專(zhuān)業(yè)和態(tài)度為您提供的服務(wù)

[[423396]]

背景

目前一些互聯(lián)網(wǎng)公司會(huì)使用消息隊(duì)列來(lái)做核心業(yè)務(wù),因?yàn)槭呛诵臉I(yè)務(wù),所以對(duì)數(shù)據(jù)的最后一致性比較敏感,如果中間出現(xiàn)數(shù)據(jù)丟失,就會(huì)引來(lái)用戶(hù)的投訴,年底績(jī)效就變成325了。之前和幾個(gè)朋友聊天,他們的公司都在用kafka來(lái)做消息隊(duì)列,使用kafka到底會(huì)不會(huì)丟消息呢?如果丟消息了該怎么做好補(bǔ)償措施呢?本文我們就一起來(lái)分析一下,并介紹如何使用Go操作Kafka可以不丟失數(shù)據(jù)。

本文操作kafka基于:https://github.com/Shopify/sarama

初識(shí)kafka架構(gòu)

維基百科對(duì)kafka的介紹:

Kafka是由Apache軟件基金會(huì)開(kāi)發(fā)的一個(gè)開(kāi)源流處理平臺(tái),由Scala和Java編寫(xiě)。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一、高吞吐、低延遲的平臺(tái)。其持久化層本質(zhì)上是一個(gè)“按照分布式事務(wù)日志架構(gòu)的大規(guī)模發(fā)布/訂閱消息隊(duì)列”,這使它作為企業(yè)級(jí)基礎(chǔ)設(shè)施來(lái)處理流式數(shù)據(jù)非常有價(jià)值。此外,Kafka可以通過(guò)Kafka Connect連接到外部系統(tǒng)(用于數(shù)據(jù)輸入/輸出),并提供了Kafka Streams——一個(gè)Java]流式處理庫(kù)。該設(shè)計(jì)受事務(wù)日志的影響較大。

kafka的整體架構(gòu)比較簡(jiǎn)單,主要由producer、broker、consumer組成:

截屏2021-09-12 上午10.00.13

針對(duì)架構(gòu)圖我們解釋一個(gè)各個(gè)模塊:

  • Producer:數(shù)據(jù)的生產(chǎn)者,可以將數(shù)據(jù)發(fā)布到所選擇的topic中。
  • Consumer:數(shù)據(jù)的消費(fèi)者,使用Consumer Group進(jìn)行標(biāo)識(shí),在topic中的每條記錄都會(huì)被分配給訂閱消費(fèi)組中的一個(gè)消費(fèi)者實(shí)例,消費(fèi)者實(shí)例可以分布在多個(gè)進(jìn)程中或者多個(gè)機(jī)器上。
  • Broker:消息中間件處理節(jié)點(diǎn)(服務(wù)器),一個(gè)節(jié)點(diǎn)就是一個(gè)broker,一個(gè)Kafka集群由一個(gè)或多個(gè)broker組成。

還有些概念我們也介紹一下:

  • topic:可以理解為一個(gè)消息的集合,topic存儲(chǔ)在broker中,一個(gè)topic可以有多個(gè)partition分區(qū),一個(gè)topic可以有多個(gè)Producer來(lái)push消息,一個(gè)topic可以有多個(gè)消費(fèi)者向其pull消息,一個(gè)topic可以存在一個(gè)或多個(gè)broker中。
  • partition:其是topic的子集,不同分區(qū)分配在不同的broker上進(jìn)行水平擴(kuò)展從而增加kafka并行處理能力,同topic下的不同分區(qū)信息是不同的,同一分區(qū)信息是有序的;每一個(gè)分區(qū)都有一個(gè)或者多個(gè)副本,其中會(huì)選舉一個(gè)leader,fowller從leader拉取數(shù)據(jù)更新自己的log(每個(gè)分區(qū)邏輯上對(duì)應(yīng)一個(gè)log文件夾),消費(fèi)者向leader中pull信息。

kafka丟消息的三個(gè)節(jié)點(diǎn)

生產(chǎn)者push消息節(jié)點(diǎn)

先看一下producer的大概寫(xiě)入流程:

  • producer先從kafka集群找到該partition的leader
  • producer將消息發(fā)送給leader,leader將該消息寫(xiě)入本地
  • follwers從leader pull消息,寫(xiě)入本地log后leader發(fā)送ack
  • leader 收到所有 ISR 中的 replica 的 ACK 后,增加high watermark,并向 producer 發(fā)送 ack

截屏2021-09-12 上午11.16.43

通過(guò)這個(gè)流程我們可以看到kafka最終會(huì)返回一個(gè)ack來(lái)確認(rèn)推送消息結(jié)果,這里kafka提供了三種模式:

  
 
 
 
  1. NoResponse RequiredAcks = 0 
  2. WaitForLocal RequiredAcks = 1 
  3. WaitForAll RequiredAcks = -1 
  • NoResponse RequiredAcks = 0:這個(gè)代表的就是數(shù)據(jù)推出的成功與否都與我無(wú)關(guān)了
  • WaitForLocal RequiredAcks = 1:當(dāng)local(leader)確認(rèn)接收成功后,就可以返回了
  • WaitForAll RequiredAcks = -1:當(dāng)所有的leader和follower都接收成功時(shí),才會(huì)返回

所以根據(jù)這三種模式我們就能推斷出生產(chǎn)者在push消息時(shí)有一定幾率丟失的,分析如下:

  • 如果我們選擇了模式1,這種模式丟失數(shù)據(jù)的幾率很大,無(wú)法重試
  • 如果我們選擇了模式2,這種模式下只要leader不掛,就可以保證數(shù)據(jù)不丟失,但是如果leader掛了,follower還沒(méi)有同步數(shù)據(jù),那么就會(huì)有一定幾率造成數(shù)據(jù)丟失
  • 如果選擇了模式3,這種情況不會(huì)造成數(shù)據(jù)丟失,但是有可能會(huì)造成數(shù)據(jù)重復(fù),假如leader與follower同步數(shù)據(jù)是網(wǎng)絡(luò)出現(xiàn)問(wèn)題,就有可能造成數(shù)據(jù)重復(fù)的問(wèn)題。

所以在生產(chǎn)環(huán)境中我們可以選擇模式2或者模式3來(lái)保證消息的可靠性,具體需要根據(jù)業(yè)務(wù)場(chǎng)景來(lái)進(jìn)行選擇,在乎吞吐量就選擇模式2,不在乎吞吐量,就選擇模式3,要想完全保證數(shù)據(jù)不丟失就選擇模式3是最可靠的。

kafka集群自身故障造成

kafka集群接收到數(shù)據(jù)后會(huì)將數(shù)據(jù)進(jìn)行持久化存儲(chǔ),最終數(shù)據(jù)會(huì)被寫(xiě)入到磁盤(pán)中,在寫(xiě)入磁盤(pán)這一步也是有可能會(huì)造成數(shù)據(jù)損失的,因?yàn)閷?xiě)入磁盤(pán)的時(shí)候操作系統(tǒng)會(huì)先將數(shù)據(jù)寫(xiě)入緩存,操作系統(tǒng)將緩存中數(shù)據(jù)寫(xiě)入磁盤(pán)的時(shí)間是不確定的,所以在這種情況下,如果kafka機(jī)器突然宕機(jī)了,也會(huì)造成數(shù)據(jù)損失,不過(guò)這種概率發(fā)生很小,一般公司內(nèi)部kafka機(jī)器都會(huì)做備份,這種情況很極端,可以忽略不計(jì)。

消費(fèi)者pull消息節(jié)點(diǎn)

push消息時(shí)會(huì)把數(shù)據(jù)追加到Partition并且分配一個(gè)偏移量,這個(gè)偏移量代表當(dāng)前消費(fèi)者消費(fèi)到的位置,通過(guò)這個(gè)Partition也可以保證消息的順序性,消費(fèi)者在pull到某個(gè)消息后,可以設(shè)置自動(dòng)提交或者手動(dòng)提交commit,提交commit成功,offset就會(huì)發(fā)生偏移:

截屏2021-09-12 下午3.37.33

所以自動(dòng)提交會(huì)帶來(lái)數(shù)據(jù)丟失的問(wèn)題,手動(dòng)提交會(huì)帶來(lái)數(shù)據(jù)重復(fù)的問(wèn)題,分析如下:

  • 在設(shè)置自動(dòng)提交的時(shí)候,當(dāng)我們拉取到一個(gè)消息后,此時(shí)offset已經(jīng)提交了,但是我們?cè)谔幚硐M(fèi)邏輯的時(shí)候失敗了,這就會(huì)導(dǎo)致數(shù)據(jù)丟失了
  • 在設(shè)置手動(dòng)提交時(shí),如果我們是在處理完消息后提交commit,那么在commit這一步發(fā)生了失敗,就會(huì)導(dǎo)致重復(fù)消費(fèi)的問(wèn)題。

比起數(shù)據(jù)丟失,重復(fù)消費(fèi)是符合業(yè)務(wù)預(yù)期的,我們可以通過(guò)一些冪等性設(shè)計(jì)來(lái)規(guī)避這個(gè)問(wèn)題。

實(shí)戰(zhàn)

完整代碼已經(jīng)上傳github:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/kafka_demo

解決push消息丟失問(wèn)題

主要是通過(guò)兩點(diǎn)來(lái)解決:

  • 通過(guò)設(shè)置RequiredAcks模式來(lái)解決,選用WaitForAll可以保證數(shù)據(jù)推送成功,不過(guò)會(huì)影響時(shí)延時(shí)
  • 引入重試機(jī)制,設(shè)置重試次數(shù)和重試間隔

因此我們寫(xiě)出如下代碼(摘出創(chuàng)建client部分):

  
 
 
 
  1. func NewAsyncProducer() sarama.AsyncProducer { 
  2.  cfg := sarama.NewConfig() 
  3.  version, err := sarama.ParseKafkaVersion(VERSION) 
  4.  if err != nil{ 
  5.   log.Fatal("NewAsyncProducer Parse kafka version failed", err.Error()) 
  6.   return nil 
  7.  } 
  8.  cfg.Version = version 
  9.  cfg.Producer.RequiredAcks = sarama.WaitForAll // 三種模式任君選擇 
  10.  cfg.Producer.Partitioner = sarama.NewHashPartitioner 
  11.  cfg.Producer.Return.Successes = true 
  12.  cfg.Producer.Return.Errors = true 
  13.  cfg.Producer.Retry.Max = 3 // 設(shè)置重試3次 
  14.  cfg.Producer.Retry.Backoff = 100 * time.Millisecond 
  15.  cli, err := sarama.NewAsyncProducer([]string{ADDR}, cfg) 
  16.  if err != nil{ 
  17.   log.Fatal("NewAsyncProducer failed", err.Error()) 
  18.   return nil 
  19.  } 
  20.  return cli 

解決pull消息丟失問(wèn)題

這個(gè)解決辦法就比較粗暴了,直接使用自動(dòng)提交的模式,在每次真正消費(fèi)完之后在自己手動(dòng)提交offset,但是會(huì)產(chǎn)生重復(fù)消費(fèi)的問(wèn)題,不過(guò)很好解決,使用冪等性操作即可解決。

代碼示例:

  
 
 
 
  1. func NewConsumerGroup(group string) sarama.ConsumerGroup { 
  2.  cfg := sarama.NewConfig() 
  3.  version, err := sarama.ParseKafkaVersion(VERSION) 
  4.  if err != nil{ 
  5.   log.Fatal("NewConsumerGroup Parse kafka version failed", err.Error()) 
  6.   return nil 
  7.  } 
  8.  
  9.  cfg.Version = version 
  10.  cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange 
  11.  cfg.Consumer.Offsets.Initial = sarama.OffsetOldest 
  12.  cfg.Consumer.Offsets.Retry.Max = 3 
  13.  cfg.Consumer.Offsets.AutoCommit.Enable = true // 開(kāi)啟自動(dòng)提交,需要手動(dòng)調(diào)用MarkMessage才有效 
  14.  cfg.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // 間隔 
  15.  client, err := sarama.NewConsumerGroup([]string{ADDR}, group, cfg) 
  16.  if err != nil { 
  17.   log.Fatal("NewConsumerGroup failed", err.Error()) 
  18.  } 
  19.  return client 

上面主要是創(chuàng)建ConsumerGroup部分,細(xì)心的讀者應(yīng)該看到了,我們這里使用的是自動(dòng)提交,說(shuō)好的使用手動(dòng)提交呢?這是因?yàn)槲覀冞@個(gè)kafka庫(kù)的特性不同,這個(gè)自動(dòng)提交需要與MarkMessage()方法配合使用才會(huì)提交(有疑問(wèn)的朋友可以實(shí)踐一下,或者看一下源碼),否則也會(huì)提交失敗,因?yàn)槲覀冊(cè)趯?xiě)消費(fèi)邏輯時(shí)要這樣寫(xiě):

  
 
 
 
  1. func (e EventHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { 
  2.  for msg := range claim.Messages() { 
  3.   var data common.KafkaMsg 
  4.   if err := json.Unmarshal(msg.Value, &data); err != nil { 
  5.    return errors.New("failed to unmarshal message err is " + err.Error()) 
  6.   } 
  7.   // 操作數(shù)據(jù),改用打印 
  8.   log.Print("consumerClaim data is ") 
  9.  
  10.   // 處理消息成功后標(biāo)記為處理, 然后會(huì)自動(dòng)提交 
  11.   session.MarkMessage(msg,"") 
  12.  } 
  13.  return nil 

或者直接使用手動(dòng)提交方法來(lái)解決,只需兩步:

第一步:關(guān)閉自動(dòng)提交:

  
 
 
 
  1. consumerConfig.Consumer.Offsets.AutoCommit.Enable = false  // 禁用自動(dòng)提交,改為手動(dòng) 

第二步:消費(fèi)邏輯中添加如下代碼,手動(dòng)提交模式下,也需要先進(jìn)行標(biāo)記,在進(jìn)行commit

  
 
 
 
  1. session.MarkMessage(msg,"") 
  2. session.Commit() 

完整代碼可以到github上下載并進(jìn)行驗(yàn)證!

總結(jié)

本文我們主要說(shuō)明了兩個(gè)知識(shí)點(diǎn):

Kafka會(huì)產(chǎn)生消息丟失

使用Go操作Kafka如何配置可以不丟失數(shù)據(jù)

 

日常業(yè)務(wù)開(kāi)發(fā)中,很多公司都喜歡拿消息隊(duì)列進(jìn)行解耦,那么你就要注意了,使用Kafka做消息隊(duì)列無(wú)法保證數(shù)據(jù)不丟失,需要我們自己手動(dòng)配置補(bǔ)償,別忘記了,要不又是一場(chǎng)P0事故。

 


網(wǎng)頁(yè)名稱(chēng):Go語(yǔ)言如何操縱Kafka保證無(wú)消息丟失
URL網(wǎng)址:http://www.5511xx.com/article/dpgggcc.html