日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線(xiàn)溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
沒(méi)想到RocketMQ的tag還有這個(gè)“坑”!

今天我就與RocketMQ Tag幾個(gè)值得關(guān)注的問(wèn)題,和大家來(lái)做一個(gè)分享,看過(guò)后的朋友,如果覺(jué)得有幫助,期待你的點(diǎn)贊支持。

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)建站!專(zhuān)注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、微信小程序開(kāi)發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶(hù)創(chuàng)新互聯(lián)還提供了保亭黎族免費(fèi)建站歡迎大家使用!

  • 消費(fèi)組訂閱關(guān)系不一致為什么會(huì)到來(lái)消息丟失?
  • 如果一個(gè)tag的消息數(shù)量很少,是否會(huì)顯示很高的延遲?

1.消費(fèi)組訂閱關(guān)系不一致導(dǎo)致消息丟失

從消息消費(fèi)的視角來(lái)看消費(fèi)組是一個(gè)基本的物理隔離單位,每一個(gè)消費(fèi)組擁有自己的消費(fèi)位點(diǎn)、消費(fèi)線(xiàn)程池等。

RocketMQ的初學(xué)者容易犯這樣一個(gè)錯(cuò)誤:消費(fèi)組中的不同消費(fèi)者,訂閱同一個(gè)topic的不同的tag,這樣會(huì)導(dǎo)致消息丟失(部分消息沒(méi)有消費(fèi)),在思考這個(gè)問(wèn)題時(shí),我們不妨先來(lái)看一張圖:

簡(jiǎn)單闡述一下其核心關(guān)鍵點(diǎn):

  • 例如一個(gè)Topic共有4個(gè)隊(duì)列。
  • 消息發(fā)送者連續(xù)發(fā)送4條tagA的消息后,再連續(xù)發(fā)送4條tagb的消息,消息發(fā)送者端默認(rèn)采取輪循的負(fù)載均衡機(jī)制,這樣topic的每一個(gè)隊(duì)列中都存在tagA、tabB兩個(gè)tag的消息。
  • 消費(fèi)組dw_tag_test的IP為192.168.3.10的消費(fèi)者訂閱tagA,另外一個(gè)IP為192.168.3.11的消費(fèi)者訂閱tagB。
  • 消費(fèi)組內(nèi)的消費(fèi)者在進(jìn)行消息消費(fèi)之前,首先會(huì)進(jìn)行隊(duì)列負(fù)載,默認(rèn)為平均分配,分配結(jié)果:

消費(fèi)者然后向Broker發(fā)起消息拉取請(qǐng)求,192.168.3.10消費(fèi)者會(huì)由于只訂閱了tagA,這樣存在q0、q1中的tagB的消息會(huì)被過(guò)濾,但被過(guò)濾的tagB并不會(huì)投遞到另外一個(gè)訂閱了tagB的消費(fèi)者,造成這部分消息沒(méi)有被投遞,從而導(dǎo)致消息丟失。

同樣192.168.3.11消費(fèi)者會(huì)由于只訂閱了tagB,這樣存在q2、q3中的tagA的消息會(huì)被過(guò)濾,但被過(guò)濾的tagA并不會(huì)投遞到另外一個(gè)訂閱了tagA的消費(fèi)者,造成這部分消息沒(méi)有被投遞,從而導(dǎo)致消息丟失。

192.168.3.10 分配到q0、q1。

192.168.3.11 分配到q2、q3。

2.如果一個(gè)tag的消息數(shù)量很少,是否會(huì)顯示很高的延遲?

開(kāi)篇有群友會(huì)存在這樣一個(gè)擔(dān)憂(yōu),其場(chǎng)景大概如下圖所示:

消費(fèi)者在消費(fèi)offset=100的這條tag1消息后,后面連續(xù)出現(xiàn)1000W條非tag1的消息,這個(gè)消費(fèi)組的積壓會(huì)持續(xù)增加,直接到1000W嗎?

要想明白這個(gè)問(wèn)題,我們至少應(yīng)該要重點(diǎn)去查看如下幾個(gè)功能的源碼:

  • 消息拉取流程
  • 位點(diǎn)提交機(jī)制

本文將從以問(wèn)題為導(dǎo)向,經(jīng)過(guò)自己的思考,并找到關(guān)鍵源碼加以求證,最后進(jìn)行簡(jiǎn)單的示例代碼進(jìn)行驗(yàn)證。

遇到問(wèn)題之前,我們可以先嘗試思考一下,如果這個(gè)功能要我們實(shí)現(xiàn),我們大概會(huì)怎么去思考?

要判斷消費(fèi)組在消費(fèi)為offset=100的消息后,在接下來(lái)1000W條消息都會(huì)被過(guò)濾的情況下,如果我們希望位點(diǎn)能夠提交,我們應(yīng)該怎么設(shè)計(jì)?我覺(jué)得應(yīng)該至少有如下幾個(gè)關(guān)鍵點(diǎn):

  • 消息消息拉取時(shí)連續(xù)1000W條消息找不到合適的消息,服務(wù)端會(huì)如何處理
  • 客戶(hù)端拉取到消息與未拉取到消息兩種情況如何提交位點(diǎn)

2.1 消息拉取流程中的關(guān)鍵設(shè)計(jì)

客戶(hù)端向服務(wù)端拉取消息,連續(xù)1000W條消息都不符合條件,一次過(guò)濾查找這么多消息,肯定非常耗時(shí),客戶(hù)端也不能等待這么久,那服務(wù)端必須采取措施,必須觸發(fā)一個(gè)停止查找的條件并向客戶(hù)端返回NO_MESSAGE,客戶(hù)端在消息查找時(shí)會(huì)等待多久呢?

核心關(guān)鍵點(diǎn)一:客戶(hù)端在向服務(wù)端發(fā)起消息拉取請(qǐng)求時(shí)會(huì)設(shè)置超時(shí)時(shí)間,代碼如下所示:

其中與超時(shí)時(shí)間相關(guān)的兩個(gè)變量,其含義分別:

  • long brokerSuspendMaxTimeMillis 在當(dāng)前沒(méi)有符合的消息時(shí)在Broker端允許掛起的時(shí)間,默認(rèn)為15s,暫時(shí)不支持自定義。
  • long timeoutMillis 消息拉取的超時(shí)時(shí)間,默認(rèn)為30s,暫時(shí)不支持自定義。

即一次消息拉取最大的超時(shí)時(shí)間為30s。

核心關(guān)鍵點(diǎn)二:Broker端在處理消息拉取時(shí)設(shè)置了完備的退出條件,具體由DefaultMessageStore的getMessage方法事項(xiàng),具體代碼如下所述:

核心要點(diǎn):

  • 首先客戶(hù)端在發(fā)起時(shí)會(huì)傳入一個(gè)本次期望拉取的消息數(shù)量,對(duì)應(yīng)上述代碼中的maxMsgNums,如果拉取到指定條數(shù)到消息(讀者朋友們?nèi)珞w代碼讀者可以查閱isTheBatchFull方法),則正常退出。
  • 另外一個(gè)非常關(guān)鍵的過(guò)濾條件,即一次消息拉取過(guò)程中,服務(wù)端最大掃描的索引字節(jié)數(shù),即一次拉取掃描ConsumeQueue的字節(jié)數(shù)量,取16000與期望拉取條數(shù)乘以20,因?yàn)橐粋€(gè)consumequeue條目占20個(gè)字節(jié)。
  • 服務(wù)端還蘊(yùn)含了一個(gè)長(zhǎng)輪循機(jī)制,即如果掃描了指定的字節(jié)數(shù),但一條消息都沒(méi)查詢(xún)到,會(huì)在broker端掛起一段時(shí)間,如果有新消息到來(lái)并符合過(guò)濾條件,則會(huì)喚醒,向客戶(hù)端返回消息。

回到這個(gè)問(wèn)題,如果服務(wù)端連續(xù)1000W條非tag1的消息,拉取請(qǐng)求不會(huì)一次性篩選,而是會(huì)返回,不至于讓客戶(hù)端超時(shí)。

從這里可以打消第一個(gè)顧慮:服務(wù)端在沒(méi)有找到消息時(shí)不會(huì)傻傻等待不返回,接下來(lái)看是否會(huì)有積壓的關(guān)鍵是看如何提交位點(diǎn)。

2.2 位點(diǎn)提交機(jī)制

2.2.1 客戶(hù)端拉取到合適的消息位點(diǎn)提交機(jī)制

Pull線(xiàn)程從服務(wù)端拉取到結(jié)構(gòu)后會(huì)將消息提交到消費(fèi)組線(xiàn)程池,主要定義在DefaultMQPushConsumerImpl的PullTask類(lèi)中,具體代碼如下所示:

眾所周知,RocketMQ是在消費(fèi)成功后進(jìn)行位點(diǎn)提交,代碼在ConsumeMessageConcurrentlyService中,如下所示:

這里的核心要點(diǎn):

消費(fèi)端成功消息完消費(fèi)后,會(huì)采用最小位點(diǎn)提交機(jī)制,確保消費(fèi)不丟失。

最小位點(diǎn)提交機(jī)制,其實(shí)就是將拉取到的消息放入一個(gè)TreeMap中,然后消費(fèi)線(xiàn)程成功消費(fèi)一條消息后,將該消息從TreeMap中移除,再計(jì)算位點(diǎn):

如果當(dāng)前TreeMap中還有消息在處理,則返回TreeMap中的第一條消息(最小位點(diǎn))

如果當(dāng)前TreeMap中已沒(méi)有消息處理,返回的位點(diǎn)為this.queueOffsetMax,queueOffsetMax的表示的是當(dāng)前消費(fèi)隊(duì)列中拉取到的最大消費(fèi)位點(diǎn),因?yàn)榇藭r(shí)拉取到的消息全部消費(fèi)了。

最后調(diào)用updateoffset方法,更新本地的位點(diǎn)緩存(有定時(shí)持久機(jī)制)

2.2.2 客戶(hù)端沒(méi)有拉取到合適的消息位點(diǎn)提交機(jī)制

客戶(hù)端如果沒(méi)有拉取到合適的消息,例如全部被tag過(guò)濾了,在DefaultMqPushConsumerImpl的PullTask中定義了處理方式,具體如下所示:

其關(guān)鍵代碼在correctTasOffset中,具體代碼請(qǐng)看:

核心要點(diǎn):如果此時(shí)處理隊(duì)列中的消息為0時(shí),則會(huì)將下一次拉取偏移量當(dāng)成位點(diǎn),而這個(gè)值在服務(wù)端進(jìn)行消息查找時(shí)會(huì)向前驅(qū)動(dòng),代碼在DefaultMessageStore的getMessage中:

故從這里可以看到,就算消息全部過(guò)濾掉了,位點(diǎn)還是會(huì)向前驅(qū)動(dòng)的,不會(huì)造成大量積壓。

2.2.3 消息拉取時(shí)會(huì)附帶一次位點(diǎn)提交

其實(shí)RocketMQ的位點(diǎn)提交,客戶(hù)端提交位點(diǎn)時(shí)會(huì)先存儲(chǔ)在本地緩存中,然后定時(shí)將位點(diǎn)信息一次性提交到Broker端,其實(shí)還存在另外一種較為隱式位點(diǎn)提交機(jī)制:

即在消息拉取時(shí),如果本地緩存中存在位點(diǎn)信息,會(huì)設(shè)置一個(gè)系統(tǒng)標(biāo)記:FLAG_COMMIT_OFFSET,該標(biāo)記在服務(wù)端會(huì)觸發(fā)一次位點(diǎn)提交,具體代碼如下:

2.2.4 總結(jié)與驗(yàn)證

綜上述所述,使用TAG并不會(huì)因?yàn)閷?duì)應(yīng)tag數(shù)量比較少,從而造成大量積壓的情況。

為了驗(yàn)證這個(gè)觀點(diǎn),我也做了一個(gè)簡(jiǎn)單的驗(yàn)證,具體方法是啟動(dòng)一個(gè)消息發(fā)送者,向指定topic發(fā)送tag B的消息,而消費(fèi)者只訂閱tag A,但消費(fèi)者并不會(huì)出現(xiàn)消費(fèi)積壓,測(cè)試代碼如下圖所示:

查看消費(fèi)組積壓情況如下圖所示:


名稱(chēng)欄目:沒(méi)想到RocketMQ的tag還有這個(gè)“坑”!
文章分享:http://www.5511xx.com/article/dpcjddi.html