日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
RocketMQ如何保證消息的可靠性投遞?

介紹

要想保證消息的可靠型投遞,無(wú)非保證如下3個(gè)階段的正常執(zhí)行即可。

  1. 生產(chǎn)者將消息成功投遞到broker
  2. broker將投遞過(guò)程的消息持久化下來(lái)
  3. 消費(fèi)者能從broker消費(fèi)到消息

發(fā)送端消息重試

producer向broker發(fā)送消息后,沒(méi)有收到broker的ack時(shí),rocketmq會(huì)自動(dòng)重試。重試的次數(shù)可以設(shè)置,默認(rèn)為2次

 
 
 
 
  1. DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME); 
  2. // 同步發(fā)送設(shè)置重試次數(shù)為5次 
  3. producer.setRetryTimesWhenSendFailed(5); 
  4. // 異步發(fā)送設(shè)置重試次數(shù)為5次 
  5. producer.setRetryTimesWhenSendAsyncFailed(5); 

消息持久化

我們先來(lái)了解一下消息的存儲(chǔ)流程,這個(gè)知識(shí)對(duì)后面分析消費(fèi)端消息重試非常重要。

和消息相關(guān)的文件有如下幾種

  1. CommitLog:存儲(chǔ)消息的元數(shù)據(jù)
  2. ConsumerQueue:存儲(chǔ)消息在CommitLog的索引
  3. IndexFile:可以通過(guò)Message Key,時(shí)間區(qū)間快速查找到消息

整個(gè)消息的存儲(chǔ)流程如下

  1. Producer將消息順序?qū)懙紺ommitLog中
  2. 有一個(gè)線程根據(jù)消息的隊(duì)列信息,寫(xiě)入到相關(guān)的ConsumerQueue中(minOffset為寫(xiě)入的初始位置,consumerOffset為當(dāng)前消費(fèi)到的位置,maxOffset為ConsumerQueue最新寫(xiě)入的位置)和IndexFile
  3. Consumer從ConsumerQueue的consumerOffset讀取到當(dāng)前應(yīng)該消費(fèi)的消息在CommitLog中的偏移量,到CommitLog中找到對(duì)應(yīng)的消息,消費(fèi)成功后移動(dòng)consumerOffset

刷盤(pán)機(jī)制

「異步刷盤(pán)」:消息被寫(xiě)入內(nèi)存的PAGECACHE,返回寫(xiě)成功狀態(tài),當(dāng)內(nèi)存里的消息量積累到一定程度時(shí),統(tǒng)一觸發(fā)寫(xiě)磁盤(pán)操作,快速寫(xiě)入 。吞吐量高,當(dāng)磁盤(pán)損壞時(shí),會(huì)丟失消息

「同步刷盤(pán)」:消息寫(xiě)入內(nèi)存的PAGECACHE后,立刻通知刷盤(pán)線程刷盤(pán),然后等待刷盤(pán)完成,刷盤(pán)線程執(zhí)行完成后喚醒等待的線程,給應(yīng)用返回消息寫(xiě)成功的狀態(tài)。吞吐量低,但不會(huì)造成消息丟失

主從復(fù)制

如果一個(gè)broker有master和slave時(shí),就需要將master上的消息復(fù)制到slave上,復(fù)制的方式有兩種

  1. 「同步復(fù)制」:master和slave均寫(xiě)成功,才返回客戶端成功。maste掛了以后可以保證數(shù)據(jù)不丟失,但是同步復(fù)制會(huì)增加數(shù)據(jù)寫(xiě)入延遲,降低吞吐量
  2. 「異步復(fù)制」:master寫(xiě)成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,但是當(dāng)master出現(xiàn)故障后,有可能造成數(shù)據(jù)丟失

消費(fèi)端消息重試

順序消息的重試

對(duì)于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列RocketMQ版會(huì)自動(dòng)不斷地進(jìn)行消息重試(每次間隔時(shí)間為1秒),這時(shí),應(yīng)用會(huì)出現(xiàn)消息消費(fèi)被阻塞的情況。所以一定要做好監(jiān)控,避免阻塞現(xiàn)象的發(fā)生

「順序消息消費(fèi)失敗后不會(huì)消費(fèi)下一條消息而是不斷重試這條消息,應(yīng)該是考慮到如果跨過(guò)這條消息消費(fèi)后面的消息會(huì)對(duì)業(yè)務(wù)邏輯產(chǎn)生影響」

「順序消息暫時(shí)僅支持集群消費(fèi)模式,不支持廣播消費(fèi)模式」

無(wú)序消息的重試

對(duì)于無(wú)序消息(普通、定時(shí)、延時(shí)、事務(wù)消息),當(dāng)消費(fèi)者消費(fèi)消息失敗時(shí),您可以通過(guò)設(shè)置返回狀態(tài)達(dá)到消息重試的結(jié)果。

「無(wú)序消息的重試只針對(duì)集群消費(fèi)方式生效;廣播方式不提供失敗重試特性,即消費(fèi)失敗后,失敗消息不再重試,繼續(xù)消費(fèi)新的消息」

「消費(fèi)時(shí)候后,重試的配置方式有如下三種」

  1. 返回Action.ReconsumeLater(推薦)
  2. 返回Null
  3. 拋出異常
 
 
 
 
  1. public class MessageListenerImpl implements MessageListener { 
  2.  
  3.     @Override 
  4.     public Action consume(Message message, ConsumeContext context) { 
  5.         //消息處理邏輯拋出異常,消息將重試。 
  6.         doConsumeMessage(message); 
  7.         //方式1:返回Action.ReconsumeLater,消息將重試。 
  8.         return Action.ReconsumeLater; 
  9.         //方式2:返回null,消息將重試。 
  10.         return null; 
  11.         //方式3:直接拋出異常,消息將重試。 
  12.         throw new RuntimeException("Consumer Message exception"); 
  13.     } 

「消費(fèi)失敗后,無(wú)需重試的配置方式」

集群消費(fèi)方式下,消息失敗后期望消息不重試,需要捕獲消費(fèi)邏輯中可能拋出的異常,最終返回Action.CommitMessage,此后這條消息將不會(huì)再重試。

 
 
 
 
  1. public class MessageListenerImpl implements MessageListener { 
  2.  
  3.     @Override 
  4.     public Action consume(Message message, ConsumeContext context) { 
  5.         try { 
  6.             doConsumeMessage(message); 
  7.         } catch (Throwable e) { 
  8.             //捕獲消費(fèi)邏輯中的所有異常,并返回Action.CommitMessage; 
  9.             return Action.CommitMessage; 
  10.         } 
  11.         //消息處理正常,直接返回Action.CommitMessage; 
  12.         return Action.CommitMessage; 
  13.     } 

「消息重試次數(shù)」

「RocketMQ默認(rèn)允許每條消息最多重試16次,每次消費(fèi)失敗發(fā)送一條延時(shí)消息到重試隊(duì)列,同一條消息失敗一次將延時(shí)等級(jí)提高一次,然后再放到重試隊(duì)列。重試16次后如果還沒(méi)有消費(fèi)成功,則將消息放到死信隊(duì)列中。」

「注意:重試隊(duì)列和死信隊(duì)列都是按照Consumer Group劃分的」

重試隊(duì)列topic名字:%RETRY% + consumerGroup

死信隊(duì)列topic名字:%DLQ% + consumerGroup

「為什么重試隊(duì)列和死信隊(duì)列要按照Consumer Group來(lái)進(jìn)行劃分?」

「因?yàn)樵赗ocketMQ的時(shí)候使用一定要保持訂閱關(guān)系一致。即一個(gè)Consumer Group訂閱的topic和tag要完全一致,不然可能會(huì)導(dǎo)致消費(fèi)邏輯混亂,消息丟失」

如下任意一種情況都表現(xiàn)為訂閱關(guān)系不一致

  • 相同ConsumerGroup下的Consumer實(shí)例訂閱了不同的Topic。
  • 相同ConsumerGroup下的Consumer實(shí)例訂閱了相同的Topic,但訂閱的Tag不一致。

我們可以通過(guò)控制臺(tái)查看各種類(lèi)型的主題

消息每次重試的間隔時(shí)間如下

第幾次重試 與上次重試的間隔時(shí)間 第幾次重試 與上次重試的間隔時(shí)間

第幾次重試 與上次重試的間隔時(shí)間 第幾次重試 與上次重試的間隔時(shí)間
110 秒97 分鐘
230 秒108 分鐘
31 分鐘119 分鐘
42 分鐘1210 分鐘
53 分鐘1320 分鐘
64 分鐘1430 分鐘
75 分鐘151 小時(shí)
86 分鐘162 小時(shí)

「前面說(shuō)到RocketMQ的消息重試是通過(guò)往重試隊(duì)列發(fā)送定時(shí)消息來(lái)實(shí)現(xiàn)的。」 RocketMQ支持18個(gè)級(jí)別的定時(shí)延時(shí),每個(gè)級(jí)別定時(shí)消息的延時(shí)時(shí)間如下。

 
 
 
 
  1. // MessageStoreConfig.java 
  2. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 

消息重試只是把定時(shí)消息的前2個(gè)級(jí)別去掉,每次發(fā)送下一個(gè)級(jí)別的定時(shí)消息

我們可以設(shè)置消費(fèi)端消息重試次數(shù)

  1. 最大重試次數(shù)小于等于16次,則重試時(shí)間間隔同上表描述。
  2. 最大重試次數(shù)大于16次,超過(guò)16次的重試時(shí)間間隔均為每次2小時(shí)。
 
 
 
 
  1. Properties properties = new Properties(); 
  2. // 配置對(duì)應(yīng)Group ID的最大消息重試次數(shù)為20次,最大重試次數(shù)為字符串類(lèi)型。 
  3. properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); 
  4. Consumer consumer =ONSFactory.createConsumer(properties); 

「那么重試隊(duì)列中的消息是如何被消費(fèi)的?」

消息消費(fèi)者在啟動(dòng)的時(shí)候,會(huì)訂閱正常的topic和重試隊(duì)列的topic

定時(shí)消息的實(shí)現(xiàn)邏輯也比較簡(jiǎn)單,可以歸納為如下幾步

1.發(fā)送延時(shí)消息

1.1 替換topic為SCHEDULE_TOPIC_XXXX,queueId為消息延遲等級(jí)(如果不替換topic直接發(fā)到對(duì)應(yīng)的consumeQueue中,則消息會(huì)被立馬消費(fèi))

1.2 將消息原來(lái)的topic,queueId放到消息擴(kuò)展屬性中

1.3 將消息應(yīng)該執(zhí)行的時(shí)間放到tagsCode中

將消息順序?qū)懙紺ommitLog中

將消息對(duì)應(yīng)的信息分發(fā)到對(duì)應(yīng)的ConsumerQueue中(topic為SCHEDULE_TOPIC_XXXX總共有18個(gè)queue,對(duì)應(yīng)18個(gè)延遲級(jí)別)

定時(shí)任務(wù)不斷判斷消息是否到達(dá)投遞時(shí)間,沒(méi)有到達(dá)則后續(xù)執(zhí)行投遞

如果到達(dá)投遞時(shí)間,則從commitLog中拉取消息的內(nèi)容,重新設(shè)置消息topic,queueId為原來(lái)的(原來(lái)的topic,queueId在消息擴(kuò)展屬性中),然后將消息投遞到commitLog中,此時(shí)消息就會(huì)被分發(fā)到對(duì)應(yīng)的隊(duì)列中,然后被消費(fèi)。

本文轉(zhuǎn)載自微信公眾號(hào)「Java識(shí)堂」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系Java識(shí)堂公眾號(hào)。


網(wǎng)頁(yè)標(biāo)題:RocketMQ如何保證消息的可靠性投遞?
瀏覽地址:http://www.5511xx.com/article/cccshdh.html