日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網營銷解決方案
消息隊列線程池模型如何保證重啟時消息不丟

本文轉載自微信公眾號「咖啡拿鐵」,作者咖啡拿鐵 。轉載本文請聯(lián)系咖啡拿鐵公眾號。

創(chuàng)新互聯(lián)專注于興隆臺網站建設服務及定制,我們擁有豐富的企業(yè)做網站經驗。 熱誠為您提供興隆臺營銷型網站建設,興隆臺網站制作、興隆臺網頁設計、興隆臺網站官網定制、微信小程序開發(fā)服務,打造興隆臺網絡公司原創(chuàng)品牌,更為您提供興隆臺網站排名全網營銷落地服務。

背景

今天在脈脈上面看到了一個帖子,比較有意思:

這個帖子的意思是:在使用Kafka的時候,我們已經設置了多個分區(qū),如何去提升消費能力?如果使用線程池的方式去提升如何保證重啟時消息不丟。

這個題其實問了兩個點,第一個是如何提升消費能力,第二個是如果選擇線程池,我們如何做到消息不丟。

這里先解釋一下這兩個問題到底是怎么回事,在很多消息隊列中都有一個概念叫partion,代表著分區(qū),分區(qū)是我們提高消息隊列消費的關鍵,我們的消費者消費的渠道就是從每個分區(qū)中來的,一個分區(qū)只能被一個消費者持有,如下圖所示:

有點類似銀行排隊,隊列的個數(shù)越多,排隊的時間相對來說就會越少,當然也可以通過異步的方式去處理,比如線程池,把所有的消息都扔到線程池中去執(zhí)行,這就引出了作者說的第二個問題,首先我們來看看同步消費為什么不會丟消息呢?

如果我們使用的是同步模型,當我們消費了之后會將offset ack回去,如果我們出現(xiàn)了重啟,沒有成功offset,那么這部分數(shù)據(jù)將會再次消費,如果是用線程池進行消費,那么我們如何進行ack呢,比如我們用線程池消費了 10,11,12 三條消息如果12先消費完,那么我們ack 13嗎?如果這樣做的話,這個時候重啟,kafka就會認為你已經處理了10,11的消息,這個時候消息就會出現(xiàn)丟失,而發(fā)這個帖子的同學就是對于這一塊是比較疑惑。

網友的回答

我們來看看網友的一些回答:

網友A:

這名網友的回答本質還是使用線程池,作者也回復了,并沒有解決線程池的問題。

網友B:

這個方法類似銀行排隊,只要隊列多,那么處理速度就會加快,的確是第一個問題的解決辦法之一。

網友C:

這一類主要解決了第二個問題,通過外部維護offset,比如通過offset入庫的方式,我們就能找到正確的應該消費的offset,這個相對來說比較復雜,使用一個MQ還得配套一個數(shù)據(jù)庫,萬一我使用MQ的服務根本都沒有數(shù)據(jù)庫,還得單獨去申請。

網友D:

還有另外一種觀點就是,代碼寫好一點,讓消費的速度提高,那消費能力自然就上去了,這個的確是一個很重要的點,通常被其他人給忽略,有時候消費比較慢,很多人可能一上來就是考慮中間件應該怎么設置,往往會忽略自己的代碼。

看了這么多帖子的一個回復,感覺沒有真正能讓我滿意的答案,下面來說說我心中的一些思路。

我的想法

對于第一個問題的話,如何提升消費能力?這個問題其實可以總結為三個辦法:

  1. 如果每臺消費者機器消費線程是固定的,那么我們可以擴容消費機器和partion,類似銀行排隊增加排隊窗口一樣。
  2. 如果機器和partion是固定的,增加消費線程就是一個比較好的辦法,但是如果是順序消費,就不能通過增加線程數(shù)的方式來提升消費能力,因為順序消費每個partion都是一個單獨的線程,只能通過第一種方式去解決。
  3. 增加自身代碼的消費能力,你想想如果銀行辦事,如果柜員的辦事效率能提升的非常高,那么整個排隊速度肯定也是很快的。

對于第二個問題,如果我們使用線程池模型,如何去解決消息丟失問題,這里我比較推薦的是RocketMQ中的做法,我們之前說了用數(shù)據(jù)庫去保存offset比較復雜,性能還比較差,在RocketMQ中使用了一個TreeMap的結構做了我們上面提到的數(shù)據(jù)庫的事:

 
 
 
 
  1. private final TreeMap msgTreeMap = new TreeMap(); 

這個TreeMap的key是每個message的offset,value就是這條消息的一些信息,TreeMap的底層是使用紅黑樹去實現(xiàn)的,我們可以很快獲取其中的最小值和最大值,當我們每次處理完某一條消息的時候我們會將這條消息從msgTreeMap中移除,

 
 
 
 
  1. public long removeMessage(final List msgs) { 
  2.         long result = -1; 
  3.         final long now = System.currentTimeMillis(); 
  4.         try { 
  5.             this.lockTreeMap.writeLock().lockInterruptibly(); 
  6.             this.lastConsumeTimestamp = now; 
  7.             try { 
  8.                 if (!msgTreeMap.isEmpty()) { 
  9.                     result = this.queueOffsetMax + 1; 
  10.                     int removedCnt = 0; 
  11.                     for (MessageExt msg : msgs) { 
  12.                         MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); 
  13.                         if (prev != null) { 
  14.                             removedCnt--; 
  15.                             msgSize.addAndGet(0 - msg.getBody().length); 
  16.                         } 
  17.                     } 
  18.                     msgCount.addAndGet(removedCnt); 
  19.  
  20.                     if (!msgTreeMap.isEmpty()) { 
  21.                         result = msgTreeMap.firstKey(); 
  22.                     } 
  23.                 } 
  24.             } finally { 
  25.                 this.lockTreeMap.writeLock().unlock(); 
  26.             } 
  27.         } catch (Throwable t) { 
  28.             log.error("removeMessage exception", t); 
  29.         } 
  30.         return result; 
  31.     } 

removeMessage這個方法就是移除已經消費過的消息,并且返回當前最新的消費offset,這里返回的結果就是msgTreeMap.firstKey(),我們ack給消息隊列server的值其實也是這個,回到我們這個問題上,如果我們發(fā)生重啟,那么其實也不需要擔心我們會出現(xiàn)消息丟失。


當前標題:消息隊列線程池模型如何保證重啟時消息不丟
網頁網址:http://www.5511xx.com/article/djsoecp.html