新聞中心
Kafka是一個分布式流處理平臺,它被廣泛用于構(gòu)建可擴展、高吞吐量的實時數(shù)據(jù)管道。然而,在處理大量數(shù)據(jù)時,Kafka數(shù)據(jù)丟失的問題會引起許多煩惱。解決這個問題的一種方法是將Kafka的數(shù)據(jù)持久化到數(shù)據(jù)庫中,從而更加可靠地保存數(shù)據(jù)。

成都創(chuàng)新互聯(lián)公司-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比朝陽網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式朝陽網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋朝陽地區(qū)。費用合理售后完善,10年實體公司更值得信賴。
Kafka數(shù)據(jù)丟失的原因和解決方法
Kafka的數(shù)據(jù)丟失問題是由于Kafka的寫入機制導(dǎo)致的。Kafka的寫入機制是異步的,不能保證發(fā)布到Kafka的消息會被成功寫入Kafka broker。因此,在某些情況下,Kafka會丟失消息,例如當(dāng)發(fā)生網(wǎng)絡(luò)斷開或Kafka broker宕機時。
為了解決這個問題,Kafka提供了一種常見的方法:使用Kafka的復(fù)制機制來保護數(shù)據(jù)。Kafka的復(fù)制機制將消息復(fù)制到備用副本中,以便在Kafka broker宕機或者數(shù)據(jù)丟失的時候,備用副本可以被用來恢復(fù)數(shù)據(jù)。但是,復(fù)制機制會增加寫入延遲和消息存儲的開銷,如果需要處理高并發(fā)或海量數(shù)據(jù),就需要考慮其他更可靠的方案。
將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫中的解決方案
將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫中是解決Kafka數(shù)據(jù)丟失問題的一種可靠方法。這種方法的實現(xiàn)基于Kafka Connect,它是一個開源工具,用于在Kafka和其他數(shù)據(jù)存儲系統(tǒng)之間進行數(shù)據(jù)傳輸。
Kafka Connect的主要作用是將Kafka的數(shù)據(jù)轉(zhuǎn)換為其他數(shù)據(jù)格式并存儲到其他數(shù)據(jù)存儲系統(tǒng)中。要將Kafka的數(shù)據(jù)持久化到數(shù)據(jù)庫中,可以使用Kafka Connect的JDBC連接器。JDBC連接器可以將Kafka消息轉(zhuǎn)換為數(shù)據(jù)庫的記錄并將其插入到數(shù)據(jù)庫中。
以下步驟描述了將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫的過程:
1. 安裝Kafka Connect:將Kafka Connect安裝在您的本地機器或云服務(wù)器上。
2. 配置Kafka Connect:配置Kafka Connect以使其可以連接到Kafka和數(shù)據(jù)庫。
3. 創(chuàng)建JDBC連接器:使用Kafka Connect創(chuàng)建JDBC連接器,該連接器將消息轉(zhuǎn)換為數(shù)據(jù)庫的記錄,并將其插入到數(shù)據(jù)庫中。
4. 測試連接器:測試連接器以確保它可以正確地將消息保存到數(shù)據(jù)庫中。
將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫的好處
將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫的好處有:
1. 可靠性:數(shù)據(jù)會被持久化到數(shù)據(jù)庫中,從而保證數(shù)據(jù)不會丟失。
2. 可擴展性:可以使用數(shù)據(jù)庫的擴展性,無需考慮Kafka復(fù)制機制的限制。
3. 數(shù)據(jù)一致性:如果在Kafka broker宕機或網(wǎng)絡(luò)斷開的情況下,可以使用數(shù)據(jù)庫恢復(fù)數(shù)據(jù)。
4. 數(shù)據(jù)備份:可以使用數(shù)據(jù)庫備份和還原機制對數(shù)據(jù)進行備份和還原。
5. 數(shù)據(jù)安全性:可以使用數(shù)據(jù)庫的安全機制來保護數(shù)據(jù)。
結(jié)論
在處理大量實時數(shù)據(jù)時,Kafka的數(shù)據(jù)丟失問題是一個令人頭痛的問題。解決這個問題的一種方法是將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫中,從而更加可靠地保存數(shù)據(jù)。使用Kafka Connect的JDBC連接器可以使持久化過程變得更加容易和可管理。因此,如果您在使用Kafka時遇到了數(shù)據(jù)丟失的問題,將Kafka數(shù)據(jù)持久化到數(shù)據(jù)庫中可能是一個可靠的解決方案。
相關(guān)問題拓展閱讀:
- kafka:replica副本同步機制
kafka:replica副本同步機制
Kafka的流行歸功于它設(shè)計和操作簡單、存儲系統(tǒng)高效、充分利用磁盤順序讀寫等特性、非常適合在線日志收集等高吞吐場景。
Kafka特性之一是它的復(fù)制協(xié)議。復(fù)制協(xié)議是保障kafka高可靠性的關(guān)鍵。對于單個集群中每個Broker不同工作負載情況下,如何自動調(diào)優(yōu)Kafka副本的工作方式是比較有挑戰(zhàn)的。它的挑戰(zhàn)之一是要知道如何避免follower進入和退出同步副本列表(即ISR)。從用戶的角度來看,如果生產(chǎn)者發(fā)送一大批海量消息,可能會引起Kafka Broker很多警告。這些警報表明一些topics處于“under replicated”狀態(tài),這些副本處于同步失敗或失效狀態(tài),更意味著數(shù)據(jù)沒有被復(fù)制到足夠數(shù)量Broker從而增加數(shù)據(jù)丟失的概率。因此Kafka集群中處于“under replicated”中Partition數(shù)要密切監(jiān)控。這個警告應(yīng)該來自于Broker失效,減慢或暫停等狀態(tài)而不是生產(chǎn)者寫不同大小消息引起的。
Kafka中主題的每個Partition有一個預(yù)寫式日志文件,每個Partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到Partition中,Partition中的每個消息都有一個連續(xù)的序列號叫做offset, 確定它在分區(qū)日志中唯一的位置。
Kafka每個topic的partition有N個副本,其中N是topic的復(fù)制因子。Kafka通過多副本機制實現(xiàn)故障自動轉(zhuǎn)移,當(dāng)Kafka集群中一個Broker失效情況下仍然保證服務(wù)可用。在Kafka中發(fā)生復(fù)制時確保partition的預(yù)寫式日志有序地寫到其他節(jié)點上。N個replicas中。其中一個replica為leader,其他都為follower,leader處理partition的所有讀寫請求,與此同時,follower會被動定期地去復(fù)制leader上的數(shù)據(jù)。
如下圖所示,Kafka集群中有4個broker, 某topic有3個partition,且復(fù)制因子即副本個數(shù)也為3:
Kafka提供了數(shù)據(jù)復(fù)制算法保證,如果leader發(fā)生故障或掛掉,一個新leader被選舉并被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個副本為leader,或者說follower追趕leader數(shù)據(jù)。leader負責(zé)維護和跟蹤ISR(In-Sync Replicas的縮寫,表示副本同步隊列,具體可參考下節(jié))中所有follower滯后的狀態(tài)。當(dāng)producer發(fā)送一條消息到broker后,leader寫入消息并復(fù)制到所有follower。消息提交之后才被成功復(fù)制到所有的同步副本。消息復(fù)制延遲受最慢的follower限制,重要的是快速檢測慢副本,如果follower“落后”太多或者失效,leader將會把它從ISR中刪除。
副本同步隊列(ISR)
所謂同步,必須滿足如下兩個條件:
默認情況下Kafka對應(yīng)的topic的replica數(shù)量為1,即每個partition都有一個唯一的肢指leader,為了確保消息的可靠性,通常應(yīng)用中將其值(由broker的參數(shù)offsets.topic.replication.factor指定)大小設(shè)置為大于1,比如3。 所有的副本(replicas)統(tǒng)稱為Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數(shù)據(jù)有一些延遲。任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表舉饑,新加入的follower也會先存放在OSR中。AR=ISR+OSR。
上一節(jié)中的HW俗稱高水位,是HighWatermark的縮寫,取一個partition對應(yīng)的ISR中最小的LEO作為HW,consumer最多只能消費到HW所在的位置。另外每個replica都有HW,leader和follower各自負責(zé)更新自己的HW的狀態(tài)。對于leader新寫入的消息,consumer不能立刻消費,leader會等歷答配待該消息被所有ISR中的replicas同步后更新HW,此時消息才能被consumer消費。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取。對于來自內(nèi)部broKer的讀取請求,沒有HW的限制。
下圖詳細的說明了當(dāng)producer生產(chǎn)消息至broker后,ISR以及HW和LEO的流轉(zhuǎn)過程:
由此可見,Kafka的復(fù)制機制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。事實上,同步復(fù)制要求所有能工作的follower都復(fù)制完,這條消息才會被commit,這種復(fù)制方式極大的影響了吞吐率。而異步復(fù)制方式下,follower異步的從leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被leader寫入log就被認為已經(jīng)commit,這種情況下如果follower都還沒有復(fù)制完,落后于leader時,突然leader宕機,則會丟失數(shù)據(jù)。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。
副本不同步的異常情況
broker 分配的任何一個 partition 都是以 Replica 對象實例的形式存在,而 Replica 在 Kafka 上是有兩個角色: leader 和 follower,只要這個 Replica 是 follower,它便會向 leader 進行數(shù)據(jù)同步。
反映在 ReplicaManager 上就是如果 Broker 的本地副本被選舉為 follower,那么它將會啟動副本同步線程,其具體實現(xiàn)如下所示:
簡單來說,makeFollowers() 的處理過程如下:
關(guān)于第6步,并不一定會為每一個 partition 都啟動一個 fetcher 線程,對于一個目的 broker,只會啟動 num.replica.fetchers 個線程,具體這個 topic-partition 會分配到哪個 fetcher 線程上,是根據(jù) topic 名和 partition id 進行計算得到,實現(xiàn)所示:
如上所示,在 ReplicaManager 調(diào)用 makeFollowers() 啟動 replica fetcher 線程后,它實際上是通過 ReplicaFetcherManager 實例進行相關(guān) topic-partition 同步線程的啟動和關(guān)閉,其啟動過程分為下面兩步:
addFetcherForPartitions() 的具體實現(xiàn)如下所示:
這個方法其實是做了下面這幾件事:
ReplicaFetcherManager 創(chuàng)建 replica Fetcher 線程的實現(xiàn)如下:
replica fetcher 線程在啟動之后就開始進行正常數(shù)據(jù)同步流程了,這個過程都是在 ReplicaFetcherThread 線程中實現(xiàn)的。
ReplicaFetcherThread 的 doWork() 方法是一直在這個線程中的 run() 中調(diào)用的,實現(xiàn)方法如下:
在 doWork() 方法中主要做了兩件事:
processFetchRequest() 這個方法的作用是發(fā)送 Fetch 請求,并對返回的結(jié)果進行處理,最終寫入到本地副本的 Log 實例中,其具體實現(xiàn):
其處理過程簡單總結(jié)一下:
fetch() 方法作用是發(fā)送 Fetch 請求,并返回相應(yīng)的結(jié)果,其具體的實現(xiàn),如下:
processPartitionData
這個方法的作用是,處理 Fetch 請求的具體數(shù)據(jù)內(nèi)容,簡單來說就是:檢查一下數(shù)據(jù)大小是否超過限制、將數(shù)據(jù)追加到本地副本的日志文件中、更新本地副本的 hw 值。
在副本同步的過程中,會遇到哪些異常情況呢?
大家一定會想到關(guān)于 offset 的問題,在 Kafka 中,關(guān)于 offset 的處理,無論是 producer 端、consumer 端還是其他地方,offset 似乎都是一個形影不離的問題。在副本同步時,關(guān)于 offset,會遇到什么問題呢?下面舉兩個異常的場景:
以上兩種情況都是 offset OutOfRange 的情況,只不過:一是 Fetch Offset 超過了 leader 的 LEO,二是 Fetch Offset 小于 leader 最小的 offset
在介紹 Kafka 解決方案之前,我們先來自己思考一下這兩種情況應(yīng)該怎么處理?
上面是我們比較容易想出的解決方案,而在 Kafka 中,其解決方案也很類似,不過遇到情況比上面我們列出的兩種情況多了一些復(fù)雜,其解決方案如下:
針對之一種情況,在 Kafka 中,實際上還會發(fā)生這樣一種情況,1 在收到 OutOfRange 錯誤時,這時去 leader 上獲取的 LEO 值與最小的 offset 值,這時候卻發(fā)現(xiàn) leader 的 LEO 已經(jīng)從 800 變成了 1100(這個 topic-partition 的數(shù)據(jù)量增長得比較快),再按照上面的解決方案就不太合理,Kafka 這邊的解決方案是:遇到這種情況,進行重試就可以了,下次同步時就會正常了,但是依然會有上面說的那個問題。
replica fetcher 線程關(guān)閉的條件,在三種情況下會關(guān)閉對這個 topic-partition 的拉取操作:
這里直接說線程關(guān)閉,其實不是很準確,因為每個 replica fetcher 線程操作的是多個 topic-partition,而在關(guān)閉的粒度是 partition 級別,只有這個線程分配的 partition 全部關(guān)閉后,這個線程才會真正被關(guān)閉。
stopReplica
StopReplica 的請求實際上是 Controller 發(fā)送過來的,這個在 controller 部分會講述,它觸發(fā)的條件有多種,比如:broker 下線、partition replica 遷移等等。
makeLeaders
makeLeaders() 方法的調(diào)用是在 broker 上這個 partition 的副本被設(shè)置為 leader 時觸發(fā)的,其實現(xiàn)如下:
調(diào)用 ReplicaFetcherManager 的 removeFetcherForPartitions() 刪除對這些 topic-partition 的副本同步設(shè)置,這里在實現(xiàn)時,會遍歷所有的 replica fetcher 線程,都執(zhí)行 removePartitions() 方法來移除對應(yīng)的 topic-partition 。
removePartitions
這個方法的作用是:ReplicaFetcherThread 將這些 topic-partition 從自己要拉取的 partition 列表中移除。
ReplicaFetcherThread的關(guān)閉
前面介紹那么多,似乎還是沒有真正去關(guān)閉,那么 ReplicaFetcherThread 真正關(guān)閉是哪里操作的呢?
實際上 ReplicaManager 每次處理完 LeaderAndIsr 請求后,都會調(diào)用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads() 方法,如果 fetcher 線程要拉取的 topic-partition 為空,那么就會關(guān)閉掉對應(yīng)的 fetcher 線程。
關(guān)于kafka數(shù)據(jù)不丟失數(shù)據(jù)庫的介紹到此就結(jié)束了,不知道你從中找到你需要的信息了嗎 ?如果你還想了解更多這方面的信息,記得收藏關(guān)注本站。
成都創(chuàng)新互聯(lián)建站主營:成都網(wǎng)站建設(shè)、網(wǎng)站維護、網(wǎng)站改版的網(wǎng)站建設(shè)公司,提供成都網(wǎng)站制作、成都網(wǎng)站建設(shè)、成都網(wǎng)站推廣、成都網(wǎng)站優(yōu)化seo、響應(yīng)式移動網(wǎng)站開發(fā)制作等網(wǎng)站服務(wù)。
分享標(biāo)題:解決Kafka數(shù)據(jù)不丟失問題,數(shù)據(jù)庫更加可靠(kafka數(shù)據(jù)不丟失數(shù)據(jù)庫)
本文路徑:http://www.5511xx.com/article/djsdcep.html


咨詢
建站咨詢
