日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
深入理解RocketMQ延遲消息

 延遲消息是實際開發(fā)中一個非常有用的功能,本文第一部分從整體上介紹秒級精度延遲消息的實現(xiàn)思路,在第二部分結(jié)合RocketMQ的延遲消息實現(xiàn),進(jìn)行細(xì)致的講解,點出關(guān)鍵部分的源碼。第三步介紹延遲消息與消息重試的關(guān)系。

創(chuàng)新互聯(lián)成立以來不斷整合自身及行業(yè)資源、不斷突破觀念以使企業(yè)策略得到完善和成熟,建立了一套“以技術(shù)為基點,以客戶需求中心、市場為導(dǎo)向”的快速反應(yīng)體系。對公司的主營項目,如中高端企業(yè)網(wǎng)站企劃 / 設(shè)計、行業(yè) / 企業(yè)門戶設(shè)計推廣、行業(yè)門戶平臺運營、手機APP定制開發(fā)、移動網(wǎng)站建設(shè)、微信網(wǎng)站制作、軟件開發(fā)、服務(wù)器托管等實行標(biāo)準(zhǔn)化操作,讓客戶可以直觀的預(yù)知到從創(chuàng)新互聯(lián)可以獲得的服務(wù)效果。

1 延遲消息介紹

基本概念:延遲消息是指生產(chǎn)者發(fā)送消息發(fā)送消息后,不能立刻被消費者消費,需要等待指定的時間后才可以被消費。

場景案例:用戶下了一個訂單之后,需要在指定時間內(nèi)(例如30分鐘)進(jìn)行支付,在到期之前可以發(fā)送一個消息提醒用戶進(jìn)行支付。

一些消息中間件的Broker端內(nèi)置了延遲消息支持的能力,如:

  •  NSQ:這是一個go語言的消息中間件,其通過內(nèi)存中的優(yōu)先級隊列來保存延遲消息,支持秒級精度,最多2個小時延遲。Java中也有對應(yīng)的實現(xiàn),如ScheduledThreadPoolExecutor內(nèi)部實際上也是使用了優(yōu)先級隊列。
  •  QMQ:采用雙重時間輪實現(xiàn)??蓞⒖迹喝我鈺r間延時消息原理講解:設(shè)計與實現(xiàn)
  •  RabbitMQ:需要安裝一個rabbitmq_delayed_message_exchange插件。
  •  RocketMQ:RocketMQ 開源版本延遲消息臨時存儲在一個內(nèi)部主題中,不支持任意時間精度,支持特定的 level,例如定時 5s,10s,1m 等。

Broker端內(nèi)置延遲消息處理能力,核心實現(xiàn)思路都是一樣:將延遲消息通過一個臨時存儲進(jìn)行暫存,到期后才投遞到目標(biāo)Topic中。如下圖所示:

步驟說明如下:

  1.  producer要將一個延遲消息發(fā)送到某個Topic中
  2.  Broker判斷這是一個延遲消息后,將其通過臨時存儲進(jìn)行暫存。
  3.  Broker內(nèi)部通過一個延遲服務(wù)(delay service)檢查消息是否到期,將到期的消息投遞到目標(biāo)Topic中。這個的延遲服務(wù)名字為delay service,不同消息中間件的延遲服務(wù)模塊名稱可能不同。
  4.  消費者消費目標(biāo)topic中的延遲投遞的消息

顯然,臨時存儲模塊和延遲服務(wù)模塊,是延遲消息實現(xiàn)的關(guān)鍵。上圖中,臨時存儲和延遲服務(wù)都是在Broker內(nèi)部實現(xiàn),對業(yè)務(wù)透明。

此外, 還有一些消息中間件原生并不支持延遲消息,如Kafka。在這種情況下,可以選擇對Kafka進(jìn)行改造,但是成本較大。另外一種方式是使用第三方臨時存儲,并加一層代理。

第三方存儲選型要求:

對于第三方臨時存儲,其需要滿足以下幾個特點:

  •  高性能:寫入延遲要低,MQ的一個重要作用是削峰填谷,在選擇臨時存儲時,寫入性能必須要高,關(guān)系型數(shù)據(jù)庫(如Mysql)通常不滿足需求。
  •  高可靠:延遲消息寫入后,不能丟失,需要進(jìn)行持久化,并進(jìn)行備份
  •  支持排序:支持按照某個字段對消息進(jìn)行排序,對于延遲消息需要按照時間進(jìn)行排序。普通消息通常先發(fā)送的會被先消費,延遲消息與普通消息不同,需要進(jìn)行排序。例如先發(fā)一條延遲10s的消息,再發(fā)一條延遲5s的消息,那么后發(fā)送的消息需要被先消費。
  •  支持長時間保存:一些業(yè)務(wù)的延遲消息,需要延遲幾個月,甚至更長,所以延遲消息必須能長時間保留。不過通常不建議延遲太長時間,存儲成本比較大,且業(yè)務(wù)邏輯可能已經(jīng)發(fā)生變化,已經(jīng)不需要消費這些消息。

例如,滴滴開源的消息中間件DDMQ,底層消息中間件的基礎(chǔ)上加了一層代理,獨立部署延遲服務(wù)模塊,使用rocksdb進(jìn)行臨時存儲。rocksdb是一個高性能的KV存儲,并支持排序。

此時對于延遲消息的流轉(zhuǎn)如下圖所示:

說明如下:

  1.  生產(chǎn)者將發(fā)送給producer proxy,proxy判斷是延遲消息,將其投遞到一個緩沖Topic中;
  2.  delay service啟動消費者,用于從緩沖topic中消費延遲消息,以時間為key,存儲到rocksdb中;
  3.  delay service判斷消息到期后,將其投遞到目標(biāo)Topic中。
  4.  消費者消費目標(biāo)topic中的數(shù)據(jù)

這種方式的好處是,因為delay service的延遲投遞能力是獨立于broker實現(xiàn)的,不需要對broker做任何改造,對于任意MQ類型都可以提供支持延遲消息的能力。例如DDMQ對RocketMQ、Kafka都提供了秒級精度的延遲消息投遞能力,但是Kafka本身并不支持延遲消息,而RocketMQ雖然支持延遲消息,但不支持秒級精度。

事實上,DDMQ還提供了很多其他功能,僅僅從延遲消息的角度,完全沒有必要使用這個proxy,直接將消息投遞到緩沖Topic中,之后通過delay service完成延遲投遞邏輯即可。

具體到delay service模塊的實現(xiàn)上,也有一些重要的細(xì)節(jié):

    1.  為了保證服務(wù)的高可用,delay service也是需要部署多個節(jié)點。

    2.  為了保證數(shù)據(jù)不丟失,每個delay service節(jié)點都需要消費緩沖Topic中的全量數(shù)據(jù),保存到各自的持久化存儲中,這樣就有了多個備份,并需要以時間為key。不過因為是各自拉取,并不能保證強一致。如果一定要強一致,那么delay service就不需要內(nèi)置存儲實現(xiàn),可以借助于其他支持強一致的存儲。

    3.  為了避免重復(fù)投遞,delay service需要進(jìn)行選主,可以借助于zookeeper、etcd等實現(xiàn)。只有master可以通過生產(chǎn)者投遞到目標(biāo)Topic中,其他節(jié)點處于備用狀態(tài)。否則,如果每個節(jié)點進(jìn)行都投遞,那么延遲消息就會被投遞多次,造成消費重復(fù)。

    4.  master要記錄自己當(dāng)前投遞到的時間到一個共享存儲中,如果master掛了,從slave節(jié)點中選出一個新的master節(jié)點,從之前記錄時間繼續(xù)開始投遞。

    5.  延遲消息的取消:一些延遲消息在未到期之前,可能希望進(jìn)行取消。通常取消邏輯實現(xiàn)較為復(fù)雜,且不夠精確。對于那些已經(jīng)快要到期的消息,可能還未取消之前,已經(jīng)發(fā)送出去了,因此需要在消費者端做檢查,才能萬無一失。

2 RocketMQ中的延遲消息

開源RocketMQ支持延遲消息,但是不支持秒級精度。默認(rèn)支持18個level的延遲消息,這是通過broker端的messageDelayLevel配置項確定的,如下:

 
 
 
 
  1. messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 

Broker在啟動時,內(nèi)部會創(chuàng)建一個內(nèi)部主題:SCHEDULE_TOPIC_XXXX,根據(jù)延遲level的個數(shù),創(chuàng)建對應(yīng)數(shù)量的隊列,也就是說18個level對應(yīng)了18個隊列。注意,這并不是說這個內(nèi)部主題只會有18個隊列,因為Broker通常是集群模式部署的,因此每個節(jié)點都有18個隊列。

延遲級別的值可以進(jìn)行修改,以滿足自己的業(yè)務(wù)需求,可以修改/添加新的level。例如:你想支持2天的延遲,修改最后一個level的值為2d,這個時候依然是18個level;也可以增加一個2d,這個時候總共就有19個level。

可以看到這里并不支持秒級精度,按照《rocketmq developer guide》中的說法,是為了避免在broker對消息進(jìn)行排序,造成性能影響。不過筆者考慮,之所以不支持,更多應(yīng)該是商業(yè)上的考慮。

生產(chǎn)者發(fā)送延遲消息:

生產(chǎn)者在發(fā)送延遲消息非常簡單,只需要設(shè)置一個延遲級別即可,注意不是具體的延遲時間,如:

 
 
 
 
  1. Message msg=new Message();  
  2. msg.setTopic("TopicA");  
  3. msg.setTags("Tag");  
  4. msg.setBody("this is a delay message".getBytes());  
  5. //設(shè)置延遲level為5,對應(yīng)延遲1分鐘  
  6. msg.setDelayTimeLevel(5);  
  7. producer.send(msg); 

如果設(shè)置的延遲level超過最大值,那么將會重置最最大值。

Broker端存儲延遲消息:

延遲消息在RocketMQ Broker端的流轉(zhuǎn)如下圖所示:

可以看到,總共有6個步驟,下面會對這6個步驟進(jìn)行詳細(xì)的講解:

  1.  修改消息Topic名稱和隊列信息
  2.  轉(zhuǎn)發(fā)消息到延遲主題的CosumeQueue中
  3.  延遲服務(wù)消費SCHEDULE_TOPIC_XXXX消息
  4.  將信息重新存儲到CommitLog中
  5.  將消息投遞到目標(biāo)Topic中
  6.  消費者消費目標(biāo)topic中的數(shù)據(jù)

第一步:修改消息Topic名稱和隊列信息

RocketMQ Broker端在存儲生產(chǎn)者寫入的消息時,首先都會將其寫入到CommitLog中。之后根據(jù)消息中的Topic信息和隊列信息,將其轉(zhuǎn)發(fā)到目標(biāo)Topic的指定隊列(ConsumeQueue)中。

由于消息一旦存儲到ConsumeQueue中,消費者就能消費到,而延遲消息不能被立即消費,所以這里將Topic的名稱修改為SCHEDULE_TOPIC_XXXX,并根據(jù)延遲級別確定要投遞到哪個隊列下。

同時,還會將消息原來要發(fā)送到的目標(biāo)Topic和隊列信息存儲到消息的屬性中。相關(guān)源碼如下所示:

org.apache.rocketmq.store.CommitLog#putMessage

第二步:轉(zhuǎn)發(fā)消息到延遲主題的CosumeQueue中

CommitLog中的消息轉(zhuǎn)發(fā)到CosumeQueue中是異步進(jìn)行的。在轉(zhuǎn)發(fā)過程中,會對延遲消息進(jìn)行特殊處理,主要是計算這條延遲消息需要在什么時候進(jìn)行投遞。

 
 
 
 
  1. 投遞時間=消息存儲時間(storeTimestamp) + 延遲級別對應(yīng)的時間 

需要注意的是,會將計算出的投遞時間當(dāng)做消息Tag的哈希值存儲到CosumeQueue中,CosumeQueue單個存儲單元組成結(jié)構(gòu)如下圖所示:

其中:

  •  Commit Log Offset:記錄在CommitLog中的位置。
  •  Size:記錄消息的大小
  •  Message Tag HashCode:記錄消息Tag的哈希值,用于消息過濾。特別的,對于延遲消息,這個字段記錄的是消息的投遞時間戳。這也是為什么java中hashCode方法返回一個int型,只占用4個字節(jié),而這里Message Tag HashCode字段卻設(shè)計成8個字節(jié)的原因。

相關(guān)源碼參見:

CommitLog#checkMessageAndReturnSize

第三步:延遲服務(wù)消費SCHEDULE_TOPIC_XXXX消息

Broker內(nèi)部有一個ScheduleMessageService類,其充當(dāng)延遲服務(wù),消費SCHEDULE_TOPIC_XXXX中的消息,并投遞到目標(biāo)Topic中。

ScheduleMessageService在啟動時,其會創(chuàng)建一個定時器Timer,并根據(jù)延遲級別的個數(shù),啟動對應(yīng)數(shù)量的TimerTask,每個TimerTask負(fù)責(zé)一個延遲級別的消費與投遞。

相關(guān)源碼如下所示:

ScheduleMessageService#start

需要注意的是,每個TimeTask在檢查消息是否到期時,首先檢查對應(yīng)隊列中尚未投遞第一條消息,如果這條消息沒到期,那么之后的消息都不會檢查。如果到期了,則進(jìn)行投遞,并檢查之后的消息是否到期。

第四步:將信息重新存儲到CommitLog中

在將消息到期后,需要投遞到目標(biāo)Topic。由于在第一步已經(jīng)記錄了原來的Topic和隊列信息,因此這里重新設(shè)置,再存儲到CommitLog即可。此外,由于之前Message Tag HashCode字段存儲的是消息的投遞時間,這里需要重新計算tag的哈希值后再存儲。

源碼參見:DeliverDelayedMessageTimerTask的messageTimeup方法。

第五步:將消息投遞到目標(biāo)Topic中

這一步與第二步類似,不過由于消息的Topic名稱已經(jīng)改為了目標(biāo)Topic。因此消息會直接投遞到目標(biāo)Topic的ConsumeQueue中,之后消費者即消費到這條消息。

3 延遲消息與消費重試的關(guān)系

RocketMQ提供了消息重試的能力,在并發(fā)模式消費消費失敗的情況下,可以返回一個枚舉值RECONSUME_LATER,那么消息之后將會進(jìn)行重試。如:

 
 
 
 
  1. consumer.registerMessageListener(new MessageListenerConcurrently() {  
  2.        @Override  
  3.        public ConsumeConcurrentlyStatus consumeMessage(List msgs,  
  4.                                        ConsumeConcurrentlyContext context) {  
  5.            //處理消息,失敗,返回RECONSUME_LATER,進(jìn)行重試  
  6.            return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
  7.        } 
  8.     }); 

重試默認(rèn)會進(jìn)行重試16次。使用過RocketMQ消息重試功能的用戶,可能看到過以下這張圖:

第幾次重試 與上次重試的間隔時間 第幾次重試 與上次重試的間隔時間
1 10 秒 9 7 分鐘
2 30 秒 10 8 分鐘
3 1 分鐘 11 9 分鐘
4 2 分鐘 12 10 分鐘
5 3 分鐘 13 20 分鐘
6 4 分鐘 14 30 分鐘
7 5 分鐘 15 1 小時
8 6 分鐘 16 2 小時

細(xì)心地的讀者發(fā)現(xiàn)了,消息重試的16個級別,實際上是把延遲消息18個級別的前兩個level去掉了。事實上,RocketMQ的消息重試也是基于延遲消息來完成的。在消息消費失敗的情況下,將其重新當(dāng)做延遲消息投遞回Broker。

在投遞回去時,會跳過前兩個level,因此只重試16次。當(dāng)然,消息重試還有一些其他的設(shè)計邏輯,在之后的文章將會進(jìn)行分析。


分享標(biāo)題:深入理解RocketMQ延遲消息
本文來源:http://www.5511xx.com/article/cohdiih.html