新聞中心
分布式事務是指事務的參與者、支持事務的服務器、資源服務器以及事務管理器分別位于不同的分布式系統(tǒng)的不同節(jié)點之上。例如在大型電商系統(tǒng)中,下單接口通常會扣減庫存、減去優(yōu)惠、生成訂單 id, 而訂單服務與庫存、優(yōu)惠、訂單 id 都是不同的服務,下單接口的成功與否,不僅取決于本地的 db 操作,而且依賴第三方系統(tǒng)的結(jié)果,這時候分布式事務就保證這些操作要么全部成功,要么全部失敗。本質(zhì)上來說,分布式事務就是為了保證不同數(shù)據(jù)庫的數(shù)據(jù)一致性。

創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務,包含不限于做網(wǎng)站、成都網(wǎng)站設計、洛浦網(wǎng)絡推廣、重慶小程序開發(fā)、洛浦網(wǎng)絡營銷、洛浦企業(yè)策劃、洛浦品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務,您的肯定,是我們最大的嘉獎;創(chuàng)新互聯(lián)公司為所有大學生創(chuàng)業(yè)者提供洛浦建站搭建服務,24小時服務熱線:18982081108,官方網(wǎng)址:www.cdcxhl.com
目前解決分布式事物的解決方案有seata,lcn 等。
RocketMQ 分布式事物實現(xiàn)
RocketMQ提供了事務消息的功能,采用2PC(兩段式協(xié)議)+補償機制(事務回查)的分布式事務功能,通過消息隊列 RocketMQ 版事務消息能達到分布式事務的最終一致。
首先,我們要知道什么是半事物消息和消息回查:
- 半事務消息:
暫不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了消息隊列 RocketMQ 版服務端,但是服務端未收到生產(chǎn)者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半事務消息。
- 消息回查:
由于網(wǎng)絡閃斷、生產(chǎn)者應用重啟等原因,導致某條事務消息的二次確認丟失,消息隊列 RocketMQ 版服務端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務消息”時,需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit 或是 Rollback),該詢問過程即消息回查。
【交互流程】
事務消息發(fā)送步驟如下:
- 發(fā)送方將半事務消息發(fā)送至消息隊列 RocketMQ 版服務端。
- 消息隊列 RocketMQ 版服務端將消息持久化成功之后,向發(fā)送方返回 Ack。確認消息已經(jīng)發(fā)送成功,此時消息為半事務消息。
- 發(fā)送方開始執(zhí)行本地事務邏輯。
- 發(fā)送方根據(jù)本地事務執(zhí)行結(jié)果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態(tài)則將半事務消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態(tài)則刪除半事務消息,訂閱方將不會接受該消息。
事務消息回查步驟如下:
- 在斷網(wǎng)或者是應用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務端,經(jīng)過固定時間后服務端將對該消息發(fā)起消息回查。
- 發(fā)送方收到消息回查后,需要檢查對應消息的本地事務執(zhí)行的最終結(jié)果。
- 發(fā)送方根據(jù)檢查得到的本地事務的最終狀態(tài)再次提交二次確認,服務端仍按照步驟 4 對半事務消息進行操作。
總體而言RocketMQ事務消息分為兩條主線:
- 發(fā)送流程:發(fā)送half message(半消息),執(zhí)行本地事務,發(fā)送事務執(zhí)行結(jié)果
- 定時任務回查流程:MQ定時任務掃描半消息,回查本地事務,發(fā)送事務執(zhí)行結(jié)果
源碼相關(guān)
Producer發(fā)送事務半消息的(prepare)
在本地應用發(fā)送事務消息的核心類是TransactionMQProducer,該類通過繼承DefaultMQProducer來復用大部分發(fā)送消息相關(guān)的邏輯,這個類的代碼量非常少只有100來行,下面是這個類的sendMessageTransaction方法
這里的transactionListener就是上面所說的消息回查的類,它提供了2個方法:
- executeLocalTransaction
執(zhí)行本地事務
- checkLocalTransaction
回查本地事務
接著看DefaultMQProducer.sendMessageInTransaction()方法:
該方法主要做了以下事情
- 給消息打上事務消息相關(guān)的tag,用于broker區(qū)分普通消息和事務消息
- 發(fā)送半消息(half message)
- 發(fā)送成功則由transactionListener執(zhí)行本地事務
- 執(zhí)行endTransaction方法,告訴 broker 執(zhí)行 commit/rollback。
執(zhí)行本地事務
Producer 半事務消息發(fā)送成功后,會調(diào)用transactionListener.executeLocalTransaction方法執(zhí)行本地事務。只有半消息發(fā)送成功后,才會執(zhí)行本地事務,如果半消息發(fā)送失敗,則設置回滾。
結(jié)束事務(commit/rollback)
本地事務執(zhí)行后,則調(diào)用this.endTransaction()方法,根據(jù)本地事務執(zhí)行狀態(tài),去提交事務或者回滾事務。
如果半消息發(fā)送失敗或本地事務執(zhí)行失敗告訴服務端是刪除半消息,半消息發(fā)送成功且本地事務執(zhí)行成功則告訴服務端生效半消息
Broker端處理事務消息
Broker端通過SendMessageProcessor.processRequest()方法接收處理 Producer 發(fā)送的消息 最后會調(diào)用到SendMessageProcessor.sendMessage(),判斷消息類型,進行消息存儲。
存儲半消息
代碼 prepareMessage(msgInner) :
在這一步,備份消息的原主題名稱與原隊列ID,然后取消事務消息的消息標簽,重新設置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊列ID固定為0。與其他普通消息區(qū)分開,然后完成消息持久化。
到這里,Broker 就初步處理完了 Producer 發(fā)送的事務半消息。
半消息事務回查
兩段式協(xié)議發(fā)送與提交回滾消息,執(zhí)行完本地事務消息的狀態(tài)為UNKNOW時,結(jié)束事務不做任何操作。通過事務狀態(tài)定時回查得到發(fā)送端的事務狀態(tài)是rollback或commit。
通過TransactionalMessageCheckService線程定時去檢測RMQ_SYS_TRANS_HALF_TOPIC主題中的消息,回查消息的事務狀態(tài)。
- RMQ_SYS_TRANS_HALF_TOPIC
prepare消息的主題,事務消息首先先進入到該主題。
- RMQ_SYS_TRANS_OP_HALF_TOPIC
當消息服務器收到事務消息的提交或回滾請求后,會將消息存儲在該主題下。
Broker處理END_TRANSACTION
當Producer或者回查定時任務提交/回滾事務的時候,Broker如何處理事務消息提交、回滾命令的?其核心實現(xiàn)如下:
- 根據(jù)commitlogOffset找到消息
- 如果是提交動作,就恢復原消息的主題與隊列,再次存入commitlog文件進而轉(zhuǎn)到消息消費隊列,供消費者消費,然后將原預處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理
- 回滾消息,則直接將原預處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理。
整體實現(xiàn)流程
如果消費端消費失敗了怎么辦?
如果有消息消費失敗了,則將失敗的消息回傳給broker,即重新寫入commitLog文件,消費者將重新消費;如果消息回傳的時候,consumer和broker之間網(wǎng)絡斷開,則consumer會調(diào)用submitConsumeRequestLater()方法,在consumer端進行重新消費,如果仍然消費失敗,會不斷重試直到達到默認的16次,你可以使用msg.getReconsumeTimes()方法來獲取當前重試次數(shù),如果重試次數(shù)足夠多之后仍然無法消費成功,必須通過工單、日志等方式進行人工干預以讓producer事務進行回退處理。
Producer發(fā)送半消息失敗
可能由于網(wǎng)絡或者mq故障,導致 Producer 訂單系統(tǒng) 發(fā)送半消息(prepare)失敗。
這時訂單系統(tǒng)可以執(zhí)行回滾操作,比如“訂單關(guān)閉”等,走逆向流程退款給用戶。
半消息發(fā)送成功,本地事務執(zhí)行失敗
如果訂單系統(tǒng)發(fā)送的半消息成功了,但是執(zhí)行本地事務失敗了,如更新訂單狀態(tài)為“已完成”。
這種情況下,執(zhí)行本地事務失敗后,會返回rollback給 MQ,MQ會刪除之前發(fā)送的半消息。 也就不會調(diào)用優(yōu)惠券系統(tǒng)了。
半消息發(fā)送成功,沒收到MQ返回的響應
假如訂單系統(tǒng)發(fā)送半消息成功后,沒有收到MQ返回的響應。
這個時候可能是因為網(wǎng)絡問題,或者其他異常報錯,訂單系統(tǒng)誤以為發(fā)送MQ半消息失敗,執(zhí)行了逆向回滾流程。
但這個時候其實mq已經(jīng)保存半消息成功了,那這個消息怎么處理?
這個時候MQ的后臺消息回查定時任務TransactionalMessageCheckService會每隔1分鐘掃描一次半消息隊列,判斷是否需要消息回查,然后回查訂單系統(tǒng)的本地事務,這時MQ就會發(fā)現(xiàn)訂單已經(jīng)變成“已關(guān)閉”,此時就要發(fā)送rollback請求給mq,刪除之前的半消息。
如果commit/rollback失敗了
這個其實也是通過定時任務TransactionalMessageCheckService,它會發(fā)現(xiàn)這個消息超過一定時間還沒有進行二階段處理,就會回查本地事務。
小結(jié)
消息隊列RocketMQ分布式事務消息不僅可以實現(xiàn)應用之間的解耦,又能保證數(shù)據(jù)的最終一致性。同時,傳統(tǒng)的大事務可以被拆分為小事務,不僅能提升效率,還不會因為某一個關(guān)聯(lián)應用的不可用導致整體回滾,從而最大限度保證核心系統(tǒng)的可用性。在極端情況下,如果關(guān)聯(lián)的某一個應用始終無法處理成功,也只需對當前應用進行補償或數(shù)據(jù)訂正處理,而無需對整體業(yè)務進行回滾。
從RocketMQ事務型消息鏈路體現(xiàn)了面向失敗的設計思路,也體現(xiàn)了事務型系統(tǒng)的嚴謹性,在第二階段的消息沒有送達的時候,broker會主動請求producer端去做check,producer做完check后會將事務的狀態(tài)再次返回。雖然說實現(xiàn)最終一致的方案有很多,但是事務型消息是比較優(yōu)雅實現(xiàn)方式之一。
本文轉(zhuǎn)載自微信公眾號「小汪哥寫代碼」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系小汪哥寫代碼公眾號。
新聞名稱:事物消息的實現(xiàn)-RocketMQ知識體系6
文章出自:http://www.5511xx.com/article/djcgoih.html


咨詢
建站咨詢
