日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Kafka放棄Zookeeper后如何存儲主題與消費組呢?

Kafka放棄ZooKeeper后如何存儲主題與消費組呢?

作者:丁威 2022-03-07 10:15:28

云計算

Kafka 主題的路由信息是存儲在Zookeeper中,那為什么客戶端只需要Broker的地址,就可以獲取到主題的路由信息呢?

專注于為中小企業(yè)提供成都做網(wǎng)站、成都網(wǎng)站制作、成都外貿(mào)網(wǎng)站建設(shè)服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)宜豐免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了1000+企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。

由于筆者公司目前使用的kafka版本是2.2.1,故當下關(guān)于kafka的內(nèi)核研究目前主要是基于該版本,當然該專欄還會繼續(xù)關(guān)注Kafka3.0。

我在使用kafka時發(fā)現(xiàn)客戶端可以不依賴Zookeeper的情況下完成消息發(fā)送、消息消費,眾所周知早期的Kafka,所有的元信息(topic、消費組、集群)等信息都存儲在Zookeeper中,原先的消息發(fā)送客戶端、消息消費客戶端都需要依賴Zookeeper。

溫馨提示:Kafka逐步開啟了去zookeeper化,到kafka2.8之前實現(xiàn)了消息發(fā)送者、消息消費者的去zookeeper化,從2.8版之后broker也支持去zookeeper。

那kafka2.2.1版本中,主題的路由信息、消費組信息分別是存儲在什么地方呢?消息發(fā)送端、消息消費端是如何感知的呢?

溫馨提示:如果大家對Kafka有基本的了解,不防停留片刻,稍作思考。

1.主題元數(shù)據(jù)存儲在Zookeeper中

進入到Kafka Broker連接的Zookeeper集群,我們不難發(fā)現(xiàn)在 /{namespace}/brokers/topics節(jié)點下存在該集群中所有的主題信息,展開某一個具體的主題,如下圖所示:

關(guān)于主題的元信息,其實主要包括如下信息:

  • 分區(qū)數(shù)量 每一個具體topic下會有一個partitions節(jié)點,該節(jié)點下的每一個子節(jié)點代表一個分區(qū)。
  • 分區(qū)狀態(tài)信息 每一個分區(qū)的的狀態(tài)由葉子節(jié)點 /{namespace}/brokers/topics/{topicName}/parttions/{partNO}/state表示,存儲的內(nèi)容如下:

controller_epoch 控制器當前的選舉版本。

leader 該分區(qū)的Leader所在的Broker節(jié)點ID。

version 當前的存儲格式版本,默認為1。

leader_epoch 分區(qū)Leader的選舉版本。

isr 分區(qū)的ISR集合。

主題的路由信息是存儲在Zookeeper中,那為什么客戶端只需要Broker的地址,就可以獲取到主題的路由信息呢?

1.1 主題路由尋址

查找路由信息在Kafka2.1版本中是發(fā)送ApiKeys.METADATA請求,該請求的響應(yīng)邏輯定義在Broker中,那客戶端是如何對Broker進行路由,Broker中的路由信息又是從何而來呢?

消息發(fā)送者首次發(fā)送METADATA定位Broker機制:首次發(fā)送請求會從KafkaProducer的bootstrap.servers中設(shè)置的broker列表中選擇當前最空閑的Broker,后續(xù)能感知所有的Broker。

消息消費者發(fā)送METADATA定位Broker機制:發(fā)送到當前消費組的組協(xié)調(diào)所在的Broker。

根據(jù)查閱KafkaApis的handleTopicMetadataRequest方法,進行一些ACL校驗后進入其核心方法:

關(guān)鍵點:

  • 從MetadataCache中獲取topic到路由信息。
  • 如果MetadataCache中不存在指定topic的路由信息,如果Broker允許自動創(chuàng)建主題(auto.create.topics.enable),默認為true,則自動創(chuàng)建該主題的信息,并將主題信息寫入到zookeeper,具體操作:

在/brokers/topics節(jié)點下創(chuàng)建子節(jié)點,子節(jié)點名稱為topic的名稱。

根據(jù)當前kafka分區(qū)的機架信息,分區(qū)數(shù)、副本數(shù),broker節(jié)點數(shù),進行分配,主要盡量將主分區(qū)不放在同一個機架、存儲在主題的節(jié)點信息中,例如{"version":1,"partitions":{"4":[2,0,1],"5":[0,1,2],"1":[2,1,0],"0":[1,0,2],"2":[0,2,1],"3":[1,2,0]}},其中key為分區(qū)名稱,值為副本所在的brokerId,其中排在第一位是傾向性Leader,主題中存儲的值是靜態(tài)數(shù)據(jù),具體還會觸發(fā)選舉,選舉算法會參考這個分配。

控制器還會注冊調(diào)用registerPartitionModificationsHandlers方法,監(jiān)聽主題信息的變化,從而觸發(fā)后續(xù)流程,啟動分區(qū)的真正創(chuàng)建(各個分區(qū)的Leader選舉等)。

溫馨提示:Kafka開啟自動創(chuàng)建主題,分區(qū)數(shù)量取自kafka broker中的num.partitions參數(shù),默認為1,副本因子則取決于default.replication.factor參數(shù),默認為1。

1.2 路由信息同步機制

MetadataCache,元信息緩存,那這里的數(shù)據(jù)又是從何而來呢?MetadataCache中路由信息的更新調(diào)用鏈如下圖所示:

Kafka的KafkaController(后續(xù)統(tǒng)稱控制器)首先會聽/brokers/topics/{topicName}節(jié)點內(nèi)容的變化,一旦有新主題創(chuàng)建或主題信息變更,topic變更事件就會觸發(fā),此時TopicChange的process方法會調(diào)用,最終調(diào)用updatePartitionReplicaAssignment,也就是一旦主題的信息發(fā)生變更,控制器會向所有Broker節(jié)點發(fā)送ApiKeys.UPDATE_METADATA,各個Broker在到該請求后,會更新各個Broker中的內(nèi)存緩存,供消息發(fā)送者查找topic路由信息。

即Kafka2.2版本中,topic的元信息存儲在Zookeeper中,同時Kafka Controller會監(jiān)聽zookeeper中相關(guān)節(jié)點,從而感知信息變更,從而將路由信息通過RPC發(fā)送到集群內(nèi)所有的Broker中,故每一個Broker的內(nèi)存中都存儲一份相同的路由信息。

Kafka2.8版本開始嘗試去Zookeeper化。

思考題:為什么各個Broker不都監(jiān)聽zookeeper,從而感知topic變化,更新本地內(nèi)存呢?歡迎各位留言討論或私信dingwpmz,共同交流。

2.消費組存儲在位點主題中

在較低版本中,啟動Kafka消費組需要指定zookeeper集群的地址,因為在低版本中消費組的元信息存儲在zookeeper中,具體路徑為/consumers,但后續(xù)版本中消費端的啟動已經(jīng)不需指定zookeeper,而是指定broker的地址列表即可,那這個時候,消費組的信息是存儲在哪呢?

在前面介紹Kafka故障解決相關(guān)的文章中我們常??吹较M組組協(xié)調(diào)器,內(nèi)部持有一個消費組元數(shù)據(jù)管理器GroupMetadataManager,相關(guān)的代碼截圖如下所示:

在GroupMetadataManager對象中持有一個Map結(jié)構(gòu)的緩存,其鍵為消費組的名稱,值為GroupMetadata對象,內(nèi)部記錄消費組的狀態(tài),消費組的成員列表,位點信息。

內(nèi)存的特點:訪問高效,但隨著Broker進程的退出而丟失,消費組存儲在內(nèi)存中顯然不行,但又不在zookeeper中,那消費組的定義信息存儲在什么地方呢?

2.1消費組元信息存儲

消費組的定義信息存儲在系統(tǒng)主題__consumer_offsets中,什么,這個主題不是用來存儲消費位點的嗎?

原來__consumer_offsets不僅存儲消費組的位點信息,還存儲消費組的元信息,具體代碼入口:GroupMetadataManager#storeGroup,部分代碼截圖如下所示:

即消費組元信息當成一條消息寫入到__consumer_offsets,一條消費組元信息存儲的value值,由GroupMetadataManager的groupMetadataValue方法定義,具體代碼如下:

隨著Kafka的不斷演化,存儲格式進行了多次修改,對應(yīng)的版本如下:

  • V0:Kafka 0.10級以下版本
  • V1:大于 0.10,低于等于2.1版本。
  • V2:2.2版本及以后

消費組元信息存儲的格式為Json,具體存儲的內(nèi)容:

  • protocol_type 協(xié)議版本,取自AbstractCoordinator的抽象方法protocolType(),消費組的固定為:consumer。
  • generation 消費組元信息的版本號,每發(fā)生一次消費組重平衡,該值會加一。
  • protocol 協(xié)議內(nèi)容,存儲消費組的隊列負載算法,在構(gòu)建消費者時可通過partition.assignment.strategy參數(shù)傳遞,可以傳遞多個,消費組具體的負載算法會選擇每一個消費者都支持的協(xié)議進行隊列負載,默認的負載算法為RangeAssignor。
  • leader 當前消費組的Leader,通常為第一個加入該消費組的消費者。
  • current_state_timestamp 最新狀態(tài)變更的時間戳,該值是從V2版本開始引入。
  • members 消費組的成員信息,每一個成員信息存儲的信息如下:
  • member_id 成員id,客戶端id(clientId) + uuid。

client_id 客戶端ID。

client_host 客戶端ip地址。

rebalance_timeout 重平衡時間,默認為300000,5分鐘。

session_timeout 會話超時時間,默認為10s。

subscription 元信息,取自AbstractCoordinator的抽象方法metadata(),消費組的實現(xiàn)類為ConsumerCoordinator,主要是遍歷負載算法,每一個負載算法根據(jù)訂閱信息計算元信息。

assignment

各個消費者的隊列負載情況。

溫馨提示:GroupMetadataManager的storeGroup方法的調(diào)用時間是在消費組進行重平衡時,具體是重平衡第二階段(SYNC_GROUP)與完成重平衡。

2.2加載消息組元信息

消費組元信息是存儲在 __consumer_offsets主題中,在什么時候會從該主題中加載到內(nèi)存中呢?

在__consumer_offsets的分區(qū)發(fā)生Leader選舉時會觸發(fā)將對應(yīng)分區(qū)中的數(shù)據(jù)加載到內(nèi)存,具體的處理入口在KafkaApis的handleLeaderAndIsrRequest方法,簡易調(diào)用鏈如下圖所示:

3.總結(jié)

本文主要介紹了Kafka 主題與消費組的持久化機制,在Kafka2.8版本開始,官方逐步去除對Zookeeper的依賴,那kafka3.x之后,又會是如何存儲消費組、主題的信息呢?


文章標題:Kafka放棄Zookeeper后如何存儲主題與消費組呢?
文章位置:http://www.5511xx.com/article/cciopsd.html