新聞中心
大家好,我是君哥。

今天來(lái)分享 RocketMQ 中一個(gè)關(guān)鍵的知識(shí)點(diǎn),消費(fèi)者的啟動(dòng)過(guò)程。
多數(shù)消息隊(duì)列中,消費(fèi)者和 Broker 通信的方式有兩種,PUSH 模式和 PULL 模式:
- PUSH 模式:Broker 主動(dòng)把消息推送給訂閱的消費(fèi)者。
- PULL模式:消費(fèi)者主動(dòng)從 Broker 拉取消息。注意,RocketMQ 并沒有真正實(shí)現(xiàn) PUSH 模式, RocketMQ 中的 PUSH 。 模式,本質(zhì)上也是 PULL 模式,只是消費(fèi)端封裝了輪詢過(guò)程,相當(dāng)于開啟一個(gè)定時(shí)線程不停地從 Broker 拉取消息,拉取到消息后喚醒本地業(yè)務(wù)線程來(lái)處理。本文講解 PULL 模式的啟動(dòng)過(guò)程。涉及到到的啟動(dòng)過(guò)程如下圖:
首先看下面這張圖:
圖中可以看出,消費(fèi)者需要注冊(cè)到 Name Server,拉取消息的時(shí)候可以從 Broker 主節(jié)點(diǎn)拉取,也可以從 Broker 從節(jié)點(diǎn)拉取。
在 RocketMQ 的源碼中,拉模式有兩個(gè)消費(fèi)者相關(guān)的類,其中 DefaultMQPullCons umer 類已經(jīng)被廢棄,官方推薦使用 Defau ltLitePullConsumer 類。下面代碼來(lái)自官方示例:
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
//啟動(dòng)方法
litePullConsumer.start();
try {
while (running) {
//這里可以看到,PULL 模式下消費(fèi)者需要業(yè)務(wù)代碼主動(dòng)去拉取消息
List messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
} 上面代碼中消費(fèi)者屬于消費(fèi)組 lite_pull _consumer_test,訂閱了【TopicTest 】這個(gè) Topic 下的所有 tag。下面一起看一下啟動(dòng)方法。下圖是消費(fèi)者啟動(dòng)過(guò)程中類調(diào)用關(guān)系圖,圖中心的 pullRequestQueu e 是核心,pull 請(qǐng)求會(huì)先發(fā)送到這個(gè)隊(duì)列,然后循環(huán)地拉取處理。
檢查啟動(dòng)配置
消費(fèi)者啟動(dòng)時(shí)首先會(huì)檢查配置,檢查的配置項(xiàng)如下:
- 消費(fèi)組名稱是否合法。包括校驗(yàn)項(xiàng)包括【非空】、【長(zhǎng)度小于等于255】、符合正則表達(dá)式【^[%|a-zA-Z0-9_-]+$】、【不等于 “DEFAULT_CO NSUMER”】。
- 消息模式不能是空,包括集群和廣播兩種模式。
- MessageQueue 負(fù)載策略不能是空,包括:平均分配策略、循環(huán)分配策略、自定義分配策略、按照機(jī)房平均分配策略、按照機(jī)房就近分配策略、一致性 HASH 策略。
- 長(zhǎng)輪詢模式下,消費(fèi)者連接掛起時(shí)間不小于長(zhǎng)輪詢模式下 Broker 掛起時(shí)間,Broker 掛起時(shí)間默認(rèn) 20s,官方不建議修改。
這部分源代碼見 DefaultLitePullConsum erImpl#checkConfig。
修改消費(fèi)者實(shí)例名稱
如果是集群模式,實(shí)例名稱改為【進(jìn)程 ID + “ #” + 系統(tǒng)時(shí)間(納秒 )】,代碼如下:
//ClientConfig類
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
初始化 MQ 客戶端
創(chuàng)建一個(gè) MQClientInstance 實(shí)例,然后把消費(fèi)者注冊(cè)到 MQClientInstance。
private void initMQClientFactory() throws MQClientException {
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
boolean registerOK = mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultLitePullConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
}
初始化負(fù)載均衡器
對(duì) RebalanceLitePullImpl 實(shí)例初始化,給下面的參數(shù)賦值:
- 消費(fèi)者名稱。
- 消息模型。
- MessageQueue 負(fù)載均衡策略。
- MQ 客戶端,上節(jié)中初始化的 MQClientInstance 實(shí)例。
負(fù)載均衡線程啟動(dòng)后,默認(rèn)每 20s 做一次負(fù)載均衡,見如下代碼:
//RebalanceService 類
public void run() {
while (!this.isStopped()) {
//waitInterval 默認(rèn) 20s,可以配置
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}
初始化 Wrapper
PullAPIWrapper 這個(gè) Wrapper 類是 MQ-ClientInstance 類的 Wrapper 類,類中 pullKernelImpl 方法對(duì) MQClientInstance 類中的 pullMessage 方法進(jìn)行了裝飾,這個(gè)裝飾類主要增加了下面功能:
- 獲取 Broker 地址。
- 檢查 RocketMQ 版本。
- 如果 Broker 是從節(jié)點(diǎn),把 sysFlag 標(biāo)記偏移量的位改為 0,(偏移量 0x1)。
- 封裝請(qǐng)求 header。
- 獲取 filterServer 地址(如果消費(fèi)者是通過(guò) filterServer 從 Broker 拉取消息,這里隨機(jī)獲取一個(gè) filterServer 地址)。
代碼如下 :
//PullAPIWrapper
public PullResult pullKernelImpl(
//省略所有參數(shù)
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//1.獲取 Broker 地址
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
//省略從 Name sever 更新本地 Broker 緩存邏輯
if (findBrokerResult != null) {
{
//2.檢查 RocketMQ 版本
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
}
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
//3.把偏移量的位改為 0,(偏移量 0x1)
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
//4.封裝請(qǐng)求 header
PullMessageRequestHeader = new PullMessageRequestHeader();
//省略封裝 requestHeader
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
//5.獲取 filterServer 地址
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
初始化 offset 存儲(chǔ)器
offset 存儲(chǔ)器的 UML 類圖如下:
有兩個(gè)實(shí)現(xiàn)類分別對(duì)應(yīng)集群模式和廣播模式,本文討論的集群模式的實(shí)現(xiàn)類是 RemoteBrokerOffsetStore。offset 可以存儲(chǔ)在本地或者遠(yuǎn)端服務(wù)器。
啟動(dòng) MQ 客戶端
啟動(dòng) MQ 客戶端主要包括如下步驟:
- 把 serviceState 改為 START_FAIL ED。
- 初始化 Netty channel。
- 啟動(dòng)定時(shí)任務(wù),包括定時(shí)獲取 Name Server 地址、從 Name Server 更新 Topic 路由信息、清理過(guò)期的 Broker、向 Broker 發(fā)送心跳、持久化 offset、定時(shí)調(diào)整線程池的數(shù)量(源碼里面這個(gè)并沒有實(shí)現(xiàn)邏輯)。
- 啟動(dòng)拉取消息的線程,拉取線程的邏輯是從請(qǐng)求隊(duì)列中不停地取出 pull 請(qǐng)求,然后將請(qǐng)求發(fā)送到 Broker 進(jìn)行拉取消息,代碼如下:
//PullMessageService類
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
從下面的代碼可以看出,PULL 拉取消息最終使用了 DefaultMQPushConsumer Impl,所以 PULL 模式和 PUSH 模式拉取消息的邏輯是一樣的。
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}5.啟動(dòng) MessageQueue 負(fù)載均衡線程。
6.啟動(dòng)生產(chǎn)者線程;7.把 serviceState 改為 Running。
7.源碼參考 MQClientInstance#start。
啟動(dòng)定時(shí)任務(wù)
這個(gè)定時(shí)任務(wù)默認(rèn)每 30s 執(zhí)行一次,用于監(jiān)聽每個(gè) Topic 下的 MessageQueue 是否發(fā)生變化。代碼見 startScheduleTask 方法。
啟動(dòng)軌跡消息
軌跡消息主要用于跟蹤消息發(fā)送、消息消費(fèi)的軌跡,用于記錄詳細(xì)日志。代碼如下:
//AsyncTraceDispatcher 類
public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
這里不詳細(xì)展開了,后面再單獨(dú)討論。
總結(jié)
本文通過(guò)源碼分析講解了 RocketMQ 中 PULL 模式下的消費(fèi)者啟動(dòng)過(guò)程,在生產(chǎn)上使用比較多的還是 PUSH 模式,PULL 模式拉取消息的方法跟 PUSH 模式一樣,不同的是 PULL 模式需要應(yīng)用程序進(jìn)行拉取動(dòng)作,可以通過(guò) PULL 模式的學(xué)習(xí)更容易的理解 PUSH 模式。最后,分析一個(gè) PULL 模式啟動(dòng)過(guò)程涉及的 UML 類圖:
網(wǎng)站標(biāo)題:五張圖帶你理解RocketMQ消費(fèi)者啟動(dòng)過(guò)程
瀏覽地址:http://www.5511xx.com/article/djiggip.html


咨詢
建站咨詢
