日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
Kafka核心全面總結(jié),高可靠高性能核心原理探究

你好,我是碼哥,可以叫我靚仔

作者:mo

引言

在探究 Kafka 核心知識(shí)之前,我們先思考一個(gè)問(wèn)題:什么場(chǎng)景會(huì)促使我們使用 Kafka?  說(shuō)到這里,我們頭腦中或多或少會(huì)蹦出異步解耦和削峰填谷等字樣,是的,這就是 Kafka 最重要的落地場(chǎng)景。

異步解耦:同步調(diào)用轉(zhuǎn)換成異步消息通知,實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者的解耦。想象一個(gè)場(chǎng)景,在商品交易時(shí),在訂單創(chuàng)建完成之后,需要觸發(fā)一系列其他的操作,比如進(jìn)行用戶訂單數(shù)據(jù)的統(tǒng)計(jì)、給用戶發(fā)送短信、給用戶發(fā)送郵件等等。如果所有操作都采用同步方式實(shí)現(xiàn),將嚴(yán)重影響系統(tǒng)性能。針對(duì)此場(chǎng)景,我們可以利用消息中間件解耦訂單創(chuàng)建操作和其他后續(xù)行為。

削峰填谷:利用 broker 緩沖上游生產(chǎn)者瞬時(shí)突發(fā)的流量,使消費(fèi)者消費(fèi)流量整體平滑。對(duì)于發(fā)送能力很強(qiáng)的上游系統(tǒng),如果沒(méi)有消息中間件的保護(hù),下游系統(tǒng)可能會(huì)直接被壓垮導(dǎo)致全鏈路服務(wù)雪崩。想象秒殺業(yè)務(wù)場(chǎng)景,上游業(yè)務(wù)發(fā)起下單請(qǐng)求,下游業(yè)務(wù)執(zhí)行秒殺業(yè)務(wù)(庫(kù)存檢查,庫(kù)存凍結(jié),余額凍結(jié),生成訂單等等),下游業(yè)務(wù)處理的邏輯是相當(dāng)復(fù)雜的,并發(fā)能力有限,如果上游服務(wù)不做限流策略,瞬時(shí)可能把下游服務(wù)壓垮。針對(duì)此場(chǎng)景,我們可以利用 MQ 來(lái)做削峰填谷,讓高峰流量填充低谷空閑資源,達(dá)到系統(tǒng)資源的合理利用。

通過(guò)上述例子可以發(fā)現(xiàn)交易、支付等場(chǎng)景常需要異步解耦和削峰填谷功能解決問(wèn)題,而交易、支付等場(chǎng)景對(duì)性能、可靠性要求特別高。那么,我們本文的主角 Kafka 能否滿足相應(yīng)要求呢?下面我們來(lái)探討下。

Kafka 宏觀認(rèn)知

在探究 Kafka 的高性能、高可靠性之前,我們從宏觀上來(lái)看下 Kafka 的系統(tǒng)架構(gòu):

如上圖所示,Kafka 由 Producer、Broker、Consumer 以及負(fù)責(zé)集群管理的 ZooKeeper 組成,各部分功能如下:

  • Producer:生產(chǎn)者,負(fù)責(zé)消息的創(chuàng)建并通過(guò)一定的路由策略發(fā)送消息到合適的 Broker;
  • Broker:服務(wù)實(shí)例,負(fù)責(zé)消息的持久化、中轉(zhuǎn)等功能;
  • Consumer :消費(fèi)者,負(fù)責(zé)從 Broker 中拉?。≒ull)訂閱的消息并進(jìn)行消費(fèi),通常多個(gè)消費(fèi)者構(gòu)成一個(gè)分組,消息只能被同組中的一個(gè)消費(fèi)者消費(fèi);
  • ZooKeeper:負(fù)責(zé) broker、consumer 集群元數(shù)據(jù)的管理等;(注意:Producer 端直接連接 broker,不在 zk 上存任何數(shù)據(jù),只是通過(guò) ZK 監(jiān)聽(tīng) broker 和 topic 等信息)

上圖消息流轉(zhuǎn)過(guò)程中,還有幾個(gè)特別重要的概念—主題(Topic)、分區(qū)(Partition)、分段(segment)、位移(offset)。

  • topic:消息主題。Kafka 按 topic 對(duì)消息進(jìn)行分類,我們?cè)谑瞻l(fā)消息時(shí)只需指定 topic。
  • partition:分區(qū)。為了提升系統(tǒng)的吞吐,一個(gè) topic 下通常有多個(gè) partition,partition 分布在不同的 Broker 上,用于存儲(chǔ) topic 的消息,這使 Kafka 可以在多臺(tái)機(jī)器上處理、存儲(chǔ)消息,給 kafka 提供給了并行的消息處理能力和橫向擴(kuò)容能力。另外,為了提升系統(tǒng)的可靠性,partition 通常會(huì)分組,且每組有一個(gè)主 partition、多個(gè)副本 partition,且分布在不同的 broker 上,從而起到容災(zāi)的作用。
  • segment:分段。宏觀上看,一個(gè) partition 對(duì)應(yīng)一個(gè)日志(Log)。由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷追加到 log 文件末尾,為防止 log 文件過(guò)大導(dǎo)致數(shù)據(jù)檢索效率低下,Kafka 采取了分段和索引機(jī)制,將每個(gè) partition 分為多個(gè) segment,同時(shí)也便于消息的維護(hù)和清理。每個(gè) segment 包含一個(gè).log 日志文件、兩個(gè)索引(.index、timeindex)文件以及其他可能的文件。每個(gè) Segment 的數(shù)據(jù)文件以該段中最小的 offset 為文件名,當(dāng)查找 offset 的 Message 的時(shí)候,通過(guò)二分查找快找到 Message 所處于的 Segment 中。
  • offset:消息在日志中的位置,消息在被追加到分區(qū)日志文件的時(shí)候都會(huì)分配一個(gè)特定的偏移量。offset 是消息在分區(qū)中的唯一標(biāo)識(shí),是一個(gè)單調(diào)遞增且不變的值。Kafka 通過(guò)它來(lái)保證消息在分區(qū)內(nèi)的順序性,不過(guò) offset 并不跨越分區(qū),也就是說(shuō),Kafka 保證的是分區(qū)有序而不是主題有序。

Kafka 高可靠性、高性能探究

在對(duì) Kafka 的整體系統(tǒng)框架及相關(guān)概念簡(jiǎn)單了解后,下面我們來(lái)進(jìn)一步深入探討下高可靠性、高性能實(shí)現(xiàn)原理。

Kafka 高可靠性探究

Kafka 高可靠性的核心是保證消息在傳遞過(guò)程中不丟失,涉及如下核心環(huán)節(jié):

  • 消息從生產(chǎn)者可靠地發(fā)送至 Broker;-- 網(wǎng)絡(luò)、本地丟數(shù)據(jù);
  • 發(fā)送到 Broker 的消息可靠持久化;-- Pagecache 緩存落盤、單點(diǎn)崩潰、主從同步跨網(wǎng)絡(luò);
  • 消費(fèi)者從 Broker 消費(fèi)到消息且最好只消費(fèi)一次 -- 跨網(wǎng)絡(luò)消息傳輸 。
消息從生產(chǎn)者可靠地發(fā)送至 Broker

為了保障消息從生產(chǎn)者可靠地發(fā)送至 Broker,我們需要確保兩點(diǎn);

  1. Producer 發(fā)送消息后,能夠收到來(lái)自 Broker 的消息保存成功 ack;
  2. Producer 發(fā)送消息后,能夠捕獲超時(shí)、失敗 ack 等異常 ack 并做處理。
ack 策略

針對(duì)問(wèn)題 1,Kafka 為我們提供了三種 ack 策略,

  • Request.required.acks = 0:請(qǐng)求發(fā)送即認(rèn)為成功,不關(guān)心有沒(méi)有寫成功,常用于日志進(jìn)行分析場(chǎng)景;
  • Request.required.acks = 1:當(dāng) leader partition 寫入成功以后,才算寫入成功,有丟數(shù)據(jù)的可能;
  • Request.required.acks= -1:ISR 列表里面的所有副本都寫完以后,這條消息才算寫入成功,強(qiáng)可靠性保證;

為了實(shí)現(xiàn)強(qiáng)可靠的 kafka 系統(tǒng),我們需要設(shè)置 Request.required.acks= -1,同時(shí)還會(huì)設(shè)置集群中處于正常同步狀態(tài)的副本 follower 數(shù)量 min.insync.replicas>2,另外,設(shè)置 unclean.leader.election.enable=false 使得集群中 ISR 的 follower 才可變成新的 leader,避免特殊情況下消息截?cái)嗟某霈F(xiàn)。

消息發(fā)送策略

針對(duì)問(wèn)題 2,kafka 提供兩類消息發(fā)送方式:同步(sync)發(fā)送和異步(async)發(fā)送,相關(guān)參數(shù)如下:

以 sarama 實(shí)現(xiàn)為例,在消息發(fā)送的過(guò)程中,無(wú)論是同步發(fā)送還是異步發(fā)送都會(huì)涉及到兩個(gè)協(xié)程--負(fù)責(zé)消息發(fā)送的主協(xié)程和負(fù)責(zé)消息分發(fā)的 dispatcher 協(xié)程。

異步發(fā)送

對(duì)于異步發(fā)送(ack != 0 場(chǎng)景,等于 0 時(shí)不關(guān)心寫 kafka 結(jié)果,后文詳細(xì)講解)而言,其流程大概如下:

  1. 在主協(xié)程中調(diào)用異步發(fā)送 kafka 消息的時(shí)候,其本質(zhì)是將消息體放進(jìn)了一個(gè) input 的 channel,只要入 channel 成功,則這個(gè)函數(shù)直接返回,不會(huì)產(chǎn)生任何阻塞。相反,如果入 channel 失敗,則會(huì)返回錯(cuò)誤信息。因此調(diào)用 async 寫入的時(shí)候返回的錯(cuò)誤信息是入 channel 的錯(cuò)誤信息,至于具體最終消息有沒(méi)有發(fā)送到 kafka 的 broker,我們無(wú)法從返回值得知。
  2. 當(dāng)消息進(jìn)入 input 的 channel 后,會(huì)有另一個(gè)dispatcher 的協(xié)程負(fù)責(zé)遍歷 input,來(lái)真正發(fā)送消息到特定 Broker 上的主 Partition 上。發(fā)送結(jié)果通過(guò)一個(gè)異步協(xié)程進(jìn)行監(jiān)聽(tīng),循環(huán)處理 err channel 和 success channel,出現(xiàn)了 error 就記一個(gè)日志。因此異步寫入場(chǎng)景時(shí),寫 kafka 的錯(cuò)誤信息,我們暫時(shí)僅能夠從這個(gè)錯(cuò)誤日志來(lái)得知具體發(fā)生了什么錯(cuò),并且也不支持我們自建函數(shù)進(jìn)行兜底處理,這一點(diǎn)在 trpc-go 的官方也得到了承認(rèn)。

同步發(fā)送

同步發(fā)送(ack != 0 場(chǎng)景)是在異步發(fā)送的基礎(chǔ)上加以條件限制實(shí)現(xiàn)的。同步消息發(fā)送在 newSyncProducerFromAsyncProducer 中開(kāi)啟兩個(gè)異步協(xié)程處理消息成功與失敗的“回調(diào)”,并使用 waitGroup 進(jìn)行等待,從而將異步操作轉(zhuǎn)變?yōu)橥讲僮?,其流程大概如下?/p>

通過(guò)上述分析可以發(fā)現(xiàn),kafka 消息發(fā)送本質(zhì)上都是異步的,不過(guò)同步發(fā)送通過(guò) waitGroup 將異步操作轉(zhuǎn)變?yōu)橥讲僮?。同步發(fā)送在一定程度上確保了我們?cè)诳缇W(wǎng)絡(luò)向 Broker 傳輸消息時(shí),消息一定可以可靠地傳輸?shù)?Broker。因?yàn)樵谕桨l(fā)送場(chǎng)景我們可以明確感知消息是否發(fā)送至 Broker,若因網(wǎng)絡(luò)抖動(dòng)、機(jī)器宕機(jī)等故障導(dǎo)致消息發(fā)送失敗或結(jié)果不明,可通過(guò)重試等手段確保消息至少一次(at least once) 發(fā)送到 Broker。另外,Kafka(0.11.0.0 版本后)還為 Producer 提供兩種機(jī)制來(lái)實(shí)現(xiàn)精確一次(exactly once) 消息發(fā)送:冪等性(Idempotence)和事務(wù)(Transaction)。

小結(jié)

通過(guò) ack 策略配置、同步發(fā)送、事務(wù)消息組合能力,我們可以實(shí)現(xiàn)exactly once 語(yǔ)意跨網(wǎng)絡(luò)向 Broker 傳輸消息。但是,Producer 收到 Broker 的成功 ack,消息一定不會(huì)丟失嗎?為了搞清這個(gè)問(wèn)題,我們首先要搞明白 Broker 在接收到消息后做了哪些處理。

發(fā)送到 Broker 的消息可靠持久化

為了確保 Producer 收到 Broker 的成功 ack 后,消息一定不在 Broker 環(huán)節(jié)丟失,我們核心要關(guān)注以下幾點(diǎn):

  • Broker 返回 Producer 成功 ack 時(shí),消息是否已經(jīng)落盤;
  • Broker 宕機(jī)是否會(huì)導(dǎo)致數(shù)據(jù)丟失,容災(zāi)機(jī)制是什么;
  • Replica 副本機(jī)制帶來(lái)的多副本間數(shù)據(jù)同步一致性問(wèn)題如何解決;

Broker 異步刷盤機(jī)制

kafka 為了獲得更高吞吐,Broker 接收到消息后只是將數(shù)據(jù)寫入 PageCache 后便認(rèn)為消息已寫入成功,而 PageCache 中的數(shù)據(jù)通過(guò) linux 的 flusher 程序進(jìn)行異步刷盤(刷盤觸發(fā)條:主動(dòng)調(diào)用 sync 或 fsync 函數(shù)、可用內(nèi)存低于閥值、dirty data 時(shí)間達(dá)到閥值),將數(shù)據(jù)順序?qū)懙酱疟P。消息處理示意圖如下:

由于消息是寫入到 pageCache,單機(jī)場(chǎng)景,如果還沒(méi)刷盤 Broker 就宕機(jī)了,那么 Producer 產(chǎn)生的這部分?jǐn)?shù)據(jù)就可能丟失。為了解決單機(jī)故障可能帶來(lái)的數(shù)據(jù)丟失問(wèn)題,Kafka 為分區(qū)引入了副本機(jī)制。

Replica 副本機(jī)制

Kafka 每組分區(qū)通常有多個(gè)副本,同組分區(qū)的不同副本分布在不同的 Broker 上,保存相同的消息(可能有滯后)。副本之間是“一主多從”的關(guān)系,其中 leader 副本負(fù)責(zé)處理讀寫請(qǐng)求,follower 副本負(fù)責(zé)從 leader 拉取消息進(jìn)行同步。分區(qū)的所有副本統(tǒng)稱為 AR(Assigned Replicas),其中所有與 leader 副本保持一定同步的副本(包括 leader 副本在內(nèi))組成 ISR(In-Sync Replicas),與 leader 同步滯后過(guò)多的副本組成 OSR(Out-of-Sync Replicas),由此可見(jiàn),AR=ISR+OSR。

follower 副本是否與 leader 同步的判斷標(biāo)準(zhǔn)取決于 Broker 端參數(shù) replica.lag.time.max.ms(默認(rèn)為 10 秒),follower 默認(rèn)每隔 500ms 向 leader fetch 一次數(shù)據(jù),只要一個(gè) Follower 副本落后 Leader 副本的時(shí)間不連續(xù)超過(guò) 10 秒,那么 Kafka 就認(rèn)為該 Follower 副本與 leader 是同步的。在正常情況下,所有的 follower 副本都應(yīng)該與 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合為空。

當(dāng) leader 副本所在 Broker 宕機(jī)時(shí),Kafka 會(huì)借助 ZK 從 follower 副本中選舉新的 leader 繼續(xù)對(duì)外提供服務(wù),實(shí)現(xiàn)故障的自動(dòng)轉(zhuǎn)移,保證服務(wù)可用。為了使選舉的新 leader 和舊 leader 數(shù)據(jù)盡可能一致,當(dāng) leader 副本發(fā)生故障時(shí),默認(rèn)情況下只有在 ISR 集合中的副本才有資格被選舉為新的 leader,而在 OSR 集合中的副本則沒(méi)有任何機(jī)會(huì)(可通過(guò)設(shè)置 unclean.leader.election.enable 改變)。

當(dāng) Kafka 通過(guò)多副本機(jī)制解決單機(jī)故障問(wèn)題時(shí),同時(shí)也帶來(lái)了多副本間數(shù)據(jù)同步一致性問(wèn)題。Kafka 通過(guò)高水位更新機(jī)制、副本同步機(jī)制、 Leader Epoch 等多種措施解決了多副本間數(shù)據(jù)同步一致性問(wèn)題,下面我們來(lái)依次看下這幾大措施。

HW 和 LEO

首先,我們來(lái)看下兩個(gè)和 Kafka 中日志相關(guān)的重要概念 HW 和 LEO:

  • HW: High Watermark,高水位,表示已經(jīng)提交(commit)的最大日志偏移量,Kafka 中某條日志“已提交”的意思是 ISR 中所有節(jié)點(diǎn)都包含了此條日志,并且消費(fèi)者只能消費(fèi) HW 之前的數(shù)據(jù);
  • LEO: Log End Offset,表示當(dāng)前 log 文件中下一條待寫入消息的 offset;

如上圖所示,它代表一個(gè)日志文件,這個(gè)日志文件中有 8 條消息,0 至 5 之間的消息為已提交消息,5 至 7 的消息為未提交消息。日志文件的 HW 為 6,表示消費(fèi)者只能拉取到 5 之前的消息,而 offset 為 5 的消息對(duì)消費(fèi)者而言是不可見(jiàn)的。日志文件的 LEO 為 8,下一條消息將在此處寫入。

注意:所有副本都有對(duì)應(yīng)的 HW 和 LEO,只不過(guò) Leader 副本比較特殊,Kafka 使用 Leader 副本的高水位來(lái)定義所在分區(qū)的高水位。換句話說(shuō),分區(qū)的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的 HW 有如下特點(diǎn):

  • Leader HW:min(所有副本 LEO),為此 Leader 副本不僅要保存自己的 HW 和 LEO,還要保存 follower 副本的 HW 和 LEO,而 follower 副本只需保存自己的 HW 和 LEO;
  • Follower HW:min(follower 自身 LEO,leader HW)。

注意:為方便描述,下面Leader HW簡(jiǎn)記為HWL,F(xiàn)ollower HW簡(jiǎn)記為F,Leader LEO簡(jiǎn)記為L(zhǎng)EOL ,F(xiàn)ollower LEO簡(jiǎn)記為L(zhǎng)EOF。

下面我們演示一次完整的 HW / LEO 更新流程:

  1. 初始狀態(tài)

HWL=0,LEOL=0,HWF=0,LEOF=0。

  1. Follower 第一次 fetch
  • Leader收到Producer發(fā)來(lái)的一條消息完成存儲(chǔ), 更新LEOL=1;
  • Follower從Leader fetch數(shù)據(jù),  Leader收到請(qǐng)求,記錄follower的LEOF =0,并且嘗試更新HWL =min(全部副本LEO)=0;
  • eade返回HWL=0和LEOL=1給Follower,F(xiàn)ollower存儲(chǔ)消息并更新LEOF =1, HW=min(LEOF,HWL)=0。
  1. Follower 第二次 fetch
  • Follower再次從Leader fetch數(shù)據(jù),  Leader收到請(qǐng)求,記錄follower的LEOF =1,并且嘗試更新HWL =min(全部副本LEO)=1;
  • leade返回HWL=1和LEOL=1給Follower,Leader收到請(qǐng)求,更新自己的 HW=min(LEOF,HWL)=1。

上述更新流程中 Follower 和 Leader 的 HW 更新有時(shí)間 GAP。如果 Leader 節(jié)點(diǎn)在此期間發(fā)生故障,則 Follower 的 HW 和 Leader 的 HW 可能會(huì)處于不一致?tīng)顟B(tài),如果 Followe 被選為新的 Leader 并且以自己的 HW 為準(zhǔn)對(duì)外提供服務(wù),則可能帶來(lái)數(shù)據(jù)丟失或數(shù)據(jù)錯(cuò)亂問(wèn)題。

KIP-101 問(wèn)題:數(shù)據(jù)丟失&數(shù)據(jù)錯(cuò)亂 ^參 5^

數(shù)據(jù)丟失

第 1 步:

  1. 副本 B 作為 leader 收到 producer 的 m2 消息并寫入本地文件,等待副本 A 拉取。
  2. 副本 A 發(fā)起消息拉取請(qǐng)求,請(qǐng)求中攜帶自己的最新的日志 offset(LEO=1),B 收到后更新自己的 HW 為 1,并將 HW=1 的信息以及消息 m2 返回給 A。
  3. A 收到拉取結(jié)果后更新本地的 HW 為 1,并將 m2 寫入本地文件。發(fā)起新一輪拉取請(qǐng)求(LEO=2),B 收到 A 拉取請(qǐng)求后更新自己的 HW 為 2,沒(méi)有新數(shù)據(jù)只將 HW=2 的信息返回給 A,并且回復(fù)給 producer 寫入成功。此處的狀態(tài)就是圖中第一步的狀態(tài)。

第 2 步:

此時(shí),如果沒(méi)有異常,A 會(huì)收到 B 的回復(fù),得知目前的 HW 為 2,然后更新自身的 HW 為 2。但在此時(shí) A 重啟了,沒(méi)有來(lái)得及收到 B 的回復(fù),此時(shí) B 仍然是 leader。A 重啟之后會(huì)以 HW 為標(biāo)準(zhǔn)截?cái)嘧约旱娜罩?,因?yàn)?A 作為 follower 不知道多出的日志是否是被提交過(guò)的,防止數(shù)據(jù)不一致從而截?cái)喽嘤嗟臄?shù)據(jù)并嘗試從 leader 那里重新同步。

第 3 步:

B 崩潰了,min.isr 設(shè)置的是 1,所以 zookeeper 會(huì)從 ISR 中再選擇一個(gè)作為 leader,也就是 A,但是 A 的數(shù)據(jù)不是完整的,從而出現(xiàn)了數(shù)據(jù)丟失現(xiàn)象。

問(wèn)題在哪里?在于 A 重啟之后以 HW 為標(biāo)準(zhǔn)截?cái)嗔硕嘤嗟娜罩?。不截?cái)嘈胁恍校坎恍?,因?yàn)檫@個(gè)日志可能沒(méi)被提交過(guò)(也就是沒(méi)有被 ISR 中的所有節(jié)點(diǎn)寫入過(guò)),如果保留會(huì)導(dǎo)致日志錯(cuò)亂。

數(shù)據(jù)錯(cuò)亂

在分析日志錯(cuò)亂的問(wèn)題之前,我們需要了解到 kafka 的副本可靠性保證有一個(gè)前提:在 ISR 中至少有一個(gè)節(jié)點(diǎn)。如果節(jié)點(diǎn)均宕機(jī)的情況下,是不保證可靠性的,在這種情況會(huì)出現(xiàn)數(shù)據(jù)丟失,數(shù)據(jù)丟失是可接受的。這里我們分析的問(wèn)題比數(shù)據(jù)丟失更加槽糕,會(huì)引發(fā)日志錯(cuò)亂甚至導(dǎo)致整個(gè)系統(tǒng)異常,而這是不可接受的。

第 1 步:

  1. A 和 B 均為 ISR 中的節(jié)點(diǎn)。副本 A 作為 leader,收到 producer 的消息 m2 的請(qǐng)求后寫入 PageCache 并在某個(gè)時(shí)刻刷新到本地磁盤。
  2. 副本 B 拉取到 m2 后寫入 PageCage 后(尚未刷盤)再次去 A 中拉取新消息并告知 A 自己的 LEO=2,A 收到更新自己的 HW 為 1 并回復(fù)給 producer 成功。
  3. 此時(shí) A 和 B 同時(shí)宕機(jī),B 的 m2 由于尚未刷盤,所以 m2 消息丟失。此時(shí)的狀態(tài)就是第 1 步的狀態(tài)。

第 2 步:

由于 A 和 B 均宕機(jī),而 min.isr=1 并且 unclean.leader.election.enable=true(關(guān)閉 unclean 選擇策略),所以 Kafka 會(huì)等到第一個(gè) ISR 中的節(jié)點(diǎn)恢復(fù)并選為 leader,這里不幸的是 B 被選為 leader,而且還接收到 producer 發(fā)來(lái)的新消息 m3。注意,這里丟失 m2 消息是可接受的,畢竟所有節(jié)點(diǎn)都宕機(jī)了。

第 3 步:

A 恢復(fù)重啟后發(fā)現(xiàn)自己是 follower,而且 HW 為 2,并沒(méi)有多余的數(shù)據(jù)需要截?cái)?,所以開(kāi)始和 B 進(jìn)行新一輪的同步。但此時(shí) A 和 B 均沒(méi)有意識(shí)到,offset 為 1 的消息不一致了。

問(wèn)題在哪里?在于日志的寫入是異步的,上面也提到 Kafka 的副本策略的一個(gè)設(shè)計(jì)是消息的持久化是異步的,這就會(huì)導(dǎo)致在場(chǎng)景二的情況下被選出的 leader 不一定包含所有數(shù)據(jù),從而引發(fā)日志錯(cuò)亂的問(wèn)題。

Leader Epoch

為了解決上述缺陷,Kafka 引入了 Leader Epoch 的概念。leader epoch 和 raft 中的任期號(hào)的概念很類似,每次重新選擇 leader 的時(shí)候,用一個(gè)嚴(yán)格單調(diào)遞增的 id 來(lái)標(biāo)志,可以讓所有 follower 意識(shí)到 leader 的變化。而 follower 也不再以 HW 為準(zhǔn),每次奔潰重啟后都需要去 leader 那邊確認(rèn)下當(dāng)前 leader 的日志是從哪個(gè) offset 開(kāi)始的。下面看下 Leader Epoch 是如何解決上面兩個(gè)問(wèn)題的。

數(shù)據(jù)丟失解決

這里的關(guān)鍵點(diǎn)在于副本 A 重啟后作為 follower,不是忙著以 HW 為準(zhǔn)截?cái)嘧约旱娜罩?,而是先發(fā)起 LeaderEpochRequest 詢問(wèn)副本 B 第 0 代的最新的偏移量是多少,副本 B 會(huì)返回自己的 LEO 為 2 給副本 A,A 此時(shí)就知道消息 m2 不能被截?cái)?,所?m2 得到了保留。當(dāng) A 選為 leader 的時(shí)候就保留了所有已提交的日志,日志丟失的問(wèn)題得到解決。

如果發(fā)起 LeaderEpochRequest 的時(shí)候就已經(jīng)掛了怎么辦?這種場(chǎng)景下,不會(huì)出現(xiàn)日志丟失,因?yàn)楦北?A 被選為 leader 后不會(huì)截?cái)嘧约旱娜罩?,日志截?cái)嘀粫?huì)發(fā)生在 follower 身上。

數(shù)據(jù)錯(cuò)亂解決

這里的關(guān)鍵點(diǎn)還是在第 3 步,副本 A 重啟作為 follower 的第一步還是需要發(fā)起 LeaderEpochRequest 詢問(wèn) leader 當(dāng)前第 0 代最新的偏移量是多少,由于副本 B 已經(jīng)經(jīng)過(guò)換代,所以會(huì)返回給 A 第 1 代的起始偏移(也就是 1),A 發(fā)現(xiàn)沖突后會(huì)截?cái)嘧约浩屏繛?1 的日志,并重新開(kāi)始和 leader 同步。副本 A 和副本 B 的日志達(dá)到了一致,解決了日志錯(cuò)亂。

小結(jié)

Broker 接收到消息后只是將數(shù)據(jù)寫入 PageCache 后便認(rèn)為消息已寫入成功,但是,通過(guò)副本機(jī)制并結(jié)合 ACK 策略可以大概率規(guī)避單機(jī)宕機(jī)帶來(lái)的數(shù)據(jù)丟失問(wèn)題,并通過(guò) HW、副本同步機(jī)制、 Leader Epoch 等多種措施解決了多副本間數(shù)據(jù)同步一致性問(wèn)題,最終實(shí)現(xiàn)了 Broker 數(shù)據(jù)的可靠持久化。

消費(fèi)者從 Broker 消費(fèi)到消息且最好只消費(fèi)一次

Consumer 在消費(fèi)消息的過(guò)程中需要向 Kafka 匯報(bào)自己的位移數(shù)據(jù),只有當(dāng) Consumer 向 Kafka 匯報(bào)了消息位移,該條消息才會(huì)被 Broker 認(rèn)為已經(jīng)被消費(fèi)。因此,Consumer 端消息的可靠性主要和 offset 提交方式有關(guān),Kafka 消費(fèi)端提供了兩種消息提交方式:

正常情況下我們很難實(shí)現(xiàn) exactly once 語(yǔ)意的消息,通常是通過(guò)手動(dòng)提交+冪等實(shí)現(xiàn)消息的可靠消費(fèi)。

Kafka 高性能探究

Kafka 高性能的核心是保障系統(tǒng)低延遲、高吞吐地處理消息,為此,Kafaka 采用了許多精妙的設(shè)計(jì):

  • 異步發(fā)送
  • 批量發(fā)送
  • 壓縮技術(shù)
  • Pagecache 機(jī)制&順序追加落盤
  • 零拷貝
  • 稀疏索引
  • broker & 數(shù)據(jù)分區(qū)
  • 多 reactor 多線程網(wǎng)絡(luò)模型

異步發(fā)送

如上文所述,Kafka 提供了異步和同步兩種消息發(fā)送方式。在異步發(fā)送中,整個(gè)流程都是異步的。調(diào)用異步發(fā)送方法后,消息會(huì)被寫入 channel,然后立即返回成功。Dispatcher 協(xié)程會(huì)從 channel 輪詢消息,將其發(fā)送到 Broker,同時(shí)會(huì)有另一個(gè)異步協(xié)程負(fù)責(zé)處理 Broker 返回的結(jié)果。同步發(fā)送本質(zhì)上也是異步的,但是在處理結(jié)果時(shí),同步發(fā)送通過(guò) waitGroup 將異步操作轉(zhuǎn)換為同步。使用異步發(fā)送可以最大化提高消息發(fā)送的吞吐能力。

批量發(fā)送

Kafka 支持批量發(fā)送消息,將多個(gè)消息打包成一個(gè)批次進(jìn)行發(fā)送,從而減少網(wǎng)絡(luò)傳輸?shù)拈_(kāi)銷,提高網(wǎng)絡(luò)傳輸?shù)男屎屯掏铝俊afka 的批量發(fā)送消息是通過(guò)以下兩個(gè)參數(shù)來(lái)控制的:

  1. batch.size:控制批量發(fā)送消息的大小,默認(rèn)值為 16KB,可適當(dāng)增加 batch.size 參數(shù)值提升吞吐。但是,需要注意的是,如果批量發(fā)送的大小設(shè)置得過(guò)大,可能會(huì)導(dǎo)致消息發(fā)送的延遲增加,因此需要根據(jù)實(shí)際情況進(jìn)行調(diào)整。
  2. linger.ms:控制消息在批量發(fā)送前的等待時(shí)間,默認(rèn)值為 0。當(dāng) linger.ms 大于 0 時(shí),如果有消息發(fā)送,Kafka 會(huì)等待指定的時(shí)間,如果等待時(shí)間到達(dá)或者批量大小達(dá)到 batch.size,就會(huì)將消息打包成一個(gè)批次進(jìn)行發(fā)送??蛇m當(dāng)增加 linger.ms 參數(shù)值提升吞吐,比如 10 ~ 100。

在 Kafka 的生產(chǎn)者客戶端中,當(dāng)發(fā)送消息時(shí),如果啟用了批量發(fā)送,Kafka 會(huì)將消息緩存到緩沖區(qū)中。當(dāng)緩沖區(qū)中的消息大小達(dá)到 batch.size 或者等待時(shí)間到達(dá) linger.ms 時(shí),Kafka 會(huì)將緩沖區(qū)中的消息打包成一個(gè)批次進(jìn)行發(fā)送。如果在等待時(shí)間內(nèi)沒(méi)有達(dá)到 batch.size,Kafka 也會(huì)將緩沖區(qū)中的消息發(fā)送出去,從而避免消息積壓。

壓縮技術(shù)

Kafka 支持壓縮技術(shù),通過(guò)將消息進(jìn)行壓縮后再進(jìn)行傳輸,從而減少網(wǎng)絡(luò)傳輸?shù)拈_(kāi)銷(壓縮和解壓縮的過(guò)程會(huì)消耗一定的 CPU 資源,因此需要根據(jù)實(shí)際情況進(jìn)行調(diào)整。),提高網(wǎng)絡(luò)傳輸?shù)男屎屯掏铝俊afka 支持多種壓縮算法,在 Kafka2.1.0 版本之前,僅支持 GZIP,Snappy 和 LZ4,2.1.0 后還支持 Zstandard 算法(Facebook 開(kāi)源,能夠提供超高壓縮比)。這些壓縮算法性能對(duì)比(兩指標(biāo)都是越高越好)如下:

  • 吞吐量:LZ4>Snappy>zstd 和 GZIP,壓縮比:zstd>LZ4>GZIP>Snappy。

在 Kafka 中,壓縮技術(shù)是通過(guò)以下兩個(gè)參數(shù)來(lái)控制的:

  1. compression.type:控制壓縮算法的類型,默認(rèn)值為 none,表示不進(jìn)行壓縮。
  2. compression.level:控制壓縮的級(jí)別,取值范圍為 0-9,默認(rèn)值為-1。當(dāng)值為-1 時(shí),表示使用默認(rèn)的壓縮級(jí)別。

在 Kafka 的生產(chǎn)者客戶端中,當(dāng)發(fā)送消息時(shí),如果啟用了壓縮技術(shù),Kafka 會(huì)將消息進(jìn)行壓縮后再進(jìn)行傳輸。在消費(fèi)者客戶端中,如果消息進(jìn)行了壓縮,Kafka 會(huì)在消費(fèi)消息時(shí)將其解壓縮。注意:Broker 如果設(shè)置了和生產(chǎn)者不通的壓縮算法,接收消息后會(huì)解壓后重新壓縮保存。Broker 如果存在消息版本兼容也會(huì)觸發(fā)解壓后再壓縮。

Pagecache 機(jī)制&順序追加落盤

kafka 為了提升系統(tǒng)吞吐、降低時(shí)延,Broker 接收到消息后只是將數(shù)據(jù)寫入PageCache后便認(rèn)為消息已寫入成功,而 PageCache 中的數(shù)據(jù)通過(guò) linux 的 flusher 程序進(jìn)行異步刷盤(避免了同步刷盤的巨大系統(tǒng)開(kāi)銷),將數(shù)據(jù)順序追加寫到磁盤日志文件中。由于 pagecache 是在內(nèi)存中進(jìn)行緩存,因此讀寫速度非???,可以大大提高讀寫效率。順序追加寫充分利用順序 I/O 寫操作,避免了緩慢的隨機(jī) I/O 操作,可有效提升 Kafka 吞吐。

如上圖所示,消息被順序追加到每個(gè)分區(qū)日志文件的尾部。

零拷貝

Kafka 中存在大量的網(wǎng)絡(luò)數(shù)據(jù)持久化到磁盤(Producer 到 Broker)和磁盤文件通過(guò)網(wǎng)絡(luò)發(fā)送(Broker 到 Consumer)的過(guò)程,這一過(guò)程的性能直接影響 Kafka 的整體吞吐量。傳統(tǒng)的 IO 操作存在多次數(shù)據(jù)拷貝和上下文切換,性能比較低。Kafka 利用零拷貝技術(shù)提升上述過(guò)程性能,其中網(wǎng)絡(luò)數(shù)據(jù)持久化磁盤主要用 mmap 技術(shù),網(wǎng)絡(luò)數(shù)據(jù)傳輸環(huán)節(jié)主要使用 sendfile 技術(shù)。

索引加速之 mmap

傳統(tǒng)模式下,數(shù)據(jù)從網(wǎng)絡(luò)傳輸?shù)轿募枰?4 次數(shù)據(jù)拷貝、4 次上下文切換和兩次系統(tǒng)調(diào)用。如下圖所示:

為了減少上下文切換以及數(shù)據(jù)拷貝帶來(lái)的性能開(kāi)銷,Kafka使用mmap來(lái)處理其索引文件。Kafka中的索引文件用于在提取日志文件中的消息時(shí)進(jìn)行高效查找。這些索引文件被維護(hù)為內(nèi)存映射文件,這允許Kafka快速訪問(wèn)和搜索內(nèi)存中的索引,從而加速在日志文件中定位消息的過(guò)程。mmap 將內(nèi)核中讀緩沖區(qū)(read buffer)的地址與用戶空間的緩沖區(qū)(user buffer)進(jìn)行映射,從而實(shí)現(xiàn)內(nèi)核緩沖區(qū)與應(yīng)用程序內(nèi)存的共享,省去了將數(shù)據(jù)從內(nèi)核讀緩沖區(qū)(read buffer)拷貝到用戶緩沖區(qū)(user buffer)的過(guò)程,整個(gè)拷貝過(guò)程會(huì)發(fā)生 4 次上下文切換,1 次CPU 拷貝和 2次 DMA 拷貝。

網(wǎng)絡(luò)數(shù)據(jù)傳輸之 sendfile

傳統(tǒng)方式實(shí)現(xiàn):先讀取磁盤、再用 socket 發(fā)送,實(shí)際也是進(jìn)過(guò)四次 copy。如下圖所示:

為了減少上下文切換以及數(shù)據(jù)拷貝帶來(lái)的性能開(kāi)銷,Kafka 在 Consumer 從 Broker 讀數(shù)據(jù)過(guò)程中使用了 sendfile 技術(shù)。具體在這里采用的方案是通過(guò) NIO 的 transferTo/transferFrom 調(diào)用操作系統(tǒng)的 sendfile 實(shí)現(xiàn)零拷貝??偣舶l(fā)生 2 次內(nèi)核數(shù)據(jù)拷貝、2 次上下文切換和一次系統(tǒng)調(diào)用,消除了 CPU 數(shù)據(jù)拷貝,如下:

稀疏索引

為了方便對(duì)日志進(jìn)行檢索和過(guò)期清理,kafka 日志文件除了有用于存儲(chǔ)日志的.log 文件,還有一個(gè)位移索引文件.index和一個(gè)時(shí)間戳索引文件.timeindex 文件,并且三文件的名字完全相同,如下:

Kafka 的索引文件是按照稀疏索引的思想進(jìn)行設(shè)計(jì)的。稀疏索引的核心是不會(huì)為每個(gè)記錄都保存索引,而是寫入一定的記錄之后才會(huì)增加一個(gè)索引值,具體這個(gè)間隔有多大則通過(guò) log.index.interval.bytes 參數(shù)進(jìn)行控制,默認(rèn)大小為 4 KB,意味著 Kafka 至少寫入 4KB 消息數(shù)據(jù)之后,才會(huì)在索引文件中增加一個(gè)索引項(xiàng)??梢?jiàn),單條消息大小會(huì)影響 Kakfa 索引的插入頻率,因此 log.index.interval.bytes 也是 Kafka 調(diào)優(yōu)一個(gè)重要參數(shù)值。由于索引文件也是按照消息的順序性進(jìn)行增加索引項(xiàng)的,因此 Kafka 可以利用二分查找算法來(lái)搜索目標(biāo)索引項(xiàng),把時(shí)間復(fù)雜度降到了 O(lgN),大大減少了查找的時(shí)間。

位移索引文件.index

位移索引文件的索引項(xiàng)結(jié)構(gòu)如下:

相對(duì)位移:保存于索引文件名字上面的起始位移的差值,假設(shè)一個(gè)索引文件為:00000000000000000100.index,那么起始位移值即 100,當(dāng)存儲(chǔ)位移為 150 的消息索引時(shí),在索引文件中的相對(duì)位移則為 150 - 100 = 50,這么做的好處是使用 4 字節(jié)保存位移即可,可以節(jié)省非常多的磁盤空間。

文件物理位置:消息在 log 文件中保存的位置,也就是說(shuō) Kafka 可根據(jù)消息位移,通過(guò)位移索引文件快速找到消息在 log 文件中的物理位置,有了該物理位置的值,我們就可以快速地從 log 文件中找到對(duì)應(yīng)的消息了。下面我用圖來(lái)表示 Kafka 是如何快速檢索消息:

假設(shè) Kafka 需要找出位移為 3550 的消息,那么 Kafka 首先會(huì)使用二分查找算法找到小于 3550 的最大索引項(xiàng):[3528, 2310272],得到索引項(xiàng)之后,Kafka 會(huì)根據(jù)該索引項(xiàng)的文件物理位置在 log 文件中從位置 2310272 開(kāi)始順序查找,直至找到位移為 3550 的消息記錄為止。

時(shí)間戳索引文件.timeindex

Kafka 在 0.10.0.0 以后的版本當(dāng)中,消息中增加了時(shí)間戳信息,為了滿足用戶需要根據(jù)時(shí)間戳查詢消息記錄,Kafka 增加了時(shí)間戳索引文件,時(shí)間戳索引文件的索引項(xiàng)結(jié)構(gòu)如下:

時(shí)間戳索引文件的檢索與位移索引文件類似,如下快速檢索消息示意圖:

broker & 數(shù)據(jù)分區(qū)

Kafka 集群包含多個(gè) broker。一個(gè) topic 下通常有多個(gè) partition,partition 分布在不同的 Broker 上,用于存儲(chǔ) topic 的消息,這使 Kafka 可以在多臺(tái)機(jī)器上處理、存儲(chǔ)消息,給 kafka 提供給了并行的消息處理能力和橫向擴(kuò)容能力。

多 reactor 多線程網(wǎng)絡(luò)模型

多 Reactor 多線程網(wǎng)絡(luò)模型 是一種高效的網(wǎng)絡(luò)通信模型,可以充分利用多核 CPU 的性能,提高系統(tǒng)的吞吐量和響應(yīng)速度。Kafka 為了提升系統(tǒng)的吞吐,在 Broker 端處理消息時(shí)采用了該模型,示意如下:

SocketServer和KafkaRequestHandlerPool是其中最重要的兩個(gè)組件:

  • SocketServer:實(shí)現(xiàn) Reactor 模式,用于處理多個(gè) Client(包括客戶端和其他 broker 節(jié)點(diǎn))的并發(fā)請(qǐng)求,并將處理結(jié)果返回給 Client
  • KafkaRequestHandlerPool:Reactor 模式中的 Worker 線程池,里面定義了多個(gè)工作線程,用于處理實(shí)際的 I/O 請(qǐng)求邏輯。

整個(gè)服務(wù)端處理請(qǐng)求的流程大致分為以下幾個(gè)步驟:

  1. Acceptor 接收客戶端發(fā)來(lái)的請(qǐng)求
  2. 輪詢分發(fā)給 Processor 線程處理
  3. Processor 將請(qǐng)求封裝成 Request 對(duì)象,放到 RequestQueue 隊(duì)列
  4. KafkaRequestHandlerPool 分配工作線程,處理 RequestQueue 中的請(qǐng)求
  5. KafkaRequestHandler 線程處理完請(qǐng)求后,將響應(yīng) Response 返回給 Processor 線程
  6. Processor 線程將響應(yīng)返回給客戶端

其他知識(shí)探究

負(fù)載均衡

生產(chǎn)者負(fù)載均衡

Kafka 生產(chǎn)端的負(fù)載均衡主要指如何將消息發(fā)送到合適的分區(qū)。Kafka 生產(chǎn)者生產(chǎn)消息時(shí),根據(jù)分區(qū)器將消息投遞到指定的分區(qū)中,所以 Kafka 的負(fù)載均衡很大程度上依賴于分區(qū)器。Kafka 默認(rèn)的分區(qū)器是 Kafka 提供的 DefaultPartitioner。它的分區(qū)策略是根據(jù) Key 值進(jìn)行分區(qū)分配的:

  • 如果 key 不為 null:對(duì) Key 值進(jìn)行 Hash 計(jì)算,從所有分區(qū)中根據(jù) Key 的 Hash 值計(jì)算出一個(gè)分區(qū)號(hào);擁有相同 Key 值的消息被寫入同一個(gè)分區(qū),順序消息實(shí)現(xiàn)的關(guān)鍵;
  • 如果 key 為 null:消息將以輪詢的方式,在所有可用分區(qū)中分別寫入消息。如果不想使用 Kafka 默認(rèn)的分區(qū)器,用戶可以實(shí)現(xiàn) Partitioner 接口,自行實(shí)現(xiàn)分區(qū)方法。

消費(fèi)者負(fù)載均衡

在 Kafka 中,每個(gè)分區(qū)(Partition)只能由一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。當(dāng)消費(fèi)者組中有多個(gè)消費(fèi)者時(shí),Kafka 會(huì)自動(dòng)進(jìn)行負(fù)載均衡,將分區(qū)均勻地分配給每個(gè)消費(fèi)者。在 Kafka 中,消費(fèi)者負(fù)載均衡算法可以通過(guò)設(shè)置消費(fèi)者組的 partition.assignment.strategy 參數(shù)來(lái)選擇。目前主流的分區(qū)分配策略以下幾種:

  • range: 在保證均衡的前提下,將連續(xù)的分區(qū)分配給消費(fèi)者,對(duì)應(yīng)的實(shí)現(xiàn)是 RangeAssignor;
  • round-robin:在保證均衡的前提下,輪詢分配,對(duì)應(yīng)的實(shí)現(xiàn)是 RoundRobinAssignor;
  • 0.11.0.0 版本引入了一種新的分區(qū)分配策略 StickyAssignor,其優(yōu)勢(shì)在于能夠保證分區(qū)均衡的前提下盡量保持原有的分區(qū)分配結(jié)果,從而避免許多冗余的分區(qū)分配操作,減少分區(qū)再分配的執(zhí)行時(shí)間。

集群管理

Kafka 借助 ZooKeeper 進(jìn)行集群管理。Kafka 中很多信息都在 ZK 中維護(hù),如 broker 集群信息、consumer 集群信息、 topic 相關(guān)信息、 partition 信息等。Kafka 的很多功能也是基于 ZK 實(shí)現(xiàn)的,如 partition 選主、broker 集群管理、consumer 負(fù)載均衡等,限于篇幅本文將不展開(kāi)陳述,這里先附一張網(wǎng)上截圖大家感受下:

參考文獻(xiàn)

  1. https://www.cnblogs.com/arvinhuang/p/16437948.html
  2. https://segmentfault.com/a/1190000039133960
  3. http://matt33.com/2018/11/04/kafka-transaction/
  4. https://blog.51cto.com/u_14020077/5836698
  5. https://t1mek1ller.github.io/2020/02/15/kafka-leader-epoch/
  6. https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
  7. https://xie.infoq.cn/article/c06fea629926e2b6a8073e2f0
  8. https://xie.infoq.cn/article/8191412c8da131e78cbfa6600
  9. https://mp.weixin.qq.com/s/iEk0loXsKsMO_OCVlUsk2Q
  10. https://cloud.tencent.com/developer/article/1657649
  11. https://www.cnblogs.com/vivotech/p/16347074.html

分享題目:Kafka核心全面總結(jié),高可靠高性能核心原理探究
文章起源:http://www.5511xx.com/article/dphshpd.html