日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Rocketmq-Spring:實戰(zhàn)與源碼解析一網(wǎng)打盡

RocketMQ 是大家耳熟能詳?shù)南㈥犃校_源項目 rocketmq-spring 可以幫助開發(fā)者在 Spring Boot 項目中快速整合 RocketMQ。

創(chuàng)新互聯(lián)公司是一家專業(yè)提供色尼企業(yè)網(wǎng)站建設,專注與成都網(wǎng)站建設、成都網(wǎng)站制作、H5頁面制作、小程序制作等業(yè)務。10年已為色尼眾多企業(yè)、政府機構等服務。創(chuàng)新互聯(lián)專業(yè)的建站公司優(yōu)惠進行中。

這篇文章會介紹 Spring Boot 項目使用 rocketmq-spring SDK 實現(xiàn)消息收發(fā)的操作流程,同時筆者會從開發(fā)者的角度解讀 SDK 的設計邏輯。

一、SDK 簡介

項目地址:??https://github.com/apache/rocketmq-spring??

rocketmq-spring 的本質(zhì)是一個 Spring Boot starter 。

Spring Boot 基于“約定大于配置”(Convention over configuration)這一理念來快速地開發(fā)、測試、運行和部署 Spring 應用,并能通過簡單地與各種啟動器(如 spring-boot-web-starter)結合,讓應用直接以命令行的方式運行,不需再部署到獨立容器中。

Spring Boot starter 構造的啟動器使用起來非常方便,開發(fā)者只需要在 pom.xml 引入 starter 的依賴定義,在配置文件中編寫約定的配置即可。

下面我們看下 rocketmq-spring-boot-starter 的配置:

1、引入依賴


org.apache.rocketmq
rocketmq-spring-boot-starter
2.2.3

2、約定配置

接下來,我們分別按照生產(chǎn)者和消費者的順序,詳細的講解消息收發(fā)的操作過程。

二、生產(chǎn)者

首先我們添加依賴后,進行如下三個步驟:

1、配置文件中配置如下

rocketmq:
name-server: 127.0.0.1:9876
producer:
group: platform-sms-server-group
# access-key: myaccesskey
# secret-key: mysecretkey
topic: sms-common-topic

生產(chǎn)者配置非常簡單,主要配置名字服務地址和生產(chǎn)者組。

2、需要發(fā)送消息的類中注入 RcoketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Value("${rocketmq.topic}")
private String smsTopic;

3、發(fā)送消息,消息體可以是自定義對象,也可以是 Message 對象

rocketMQTemplate 類包含多鐘發(fā)送消息的方法:

  • 同步發(fā)送 syncSend
  • 異步發(fā)送 asyncSend
  • 順序發(fā)送 syncSendOrderly
  • oneway發(fā)送 sendOneWay

下面的代碼展示如何同步發(fā)送消息。

String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
rocketMQTemplate.syncSend(
destination,
MessageBuilder.withPayload(messageContent).
setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
build()
);
if (sendResult != null) {
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
// send message success ,do something
}
}

syncSend 方法的第一個參數(shù)是發(fā)送的目標,格式是:topic + ":" + tags ,

第二個參數(shù)是:spring-message 規(guī)范的 message 對象 ,而 MessageBuilder 是一個工具類,方法鏈式調(diào)用創(chuàng)建消息對象。

三、消費者

1、配置文件中配置如下

rocketmq:
name-server: 127.0.0.1:9876
consumer1:
group: platform-sms-worker-common-group
topic: sms-common-topic

2、實現(xiàn)消息監(jiān)聽器

@Component
@RocketMQMessageListener(
consumerGroup = "${rocketmq.consumer1.group}", //消費組
topic = "${rocketmq.consumer1.topic}" //主題
)
public class SmsMessageCommonConsumer implements RocketMQListener {
public void onMessage(String message) {
System.out.println("普通短信:" + message);
}
}

消費者實現(xiàn)類也可以實現(xiàn) RocketMQListener, 在 onMessage 方法里通過 RocketMQ 原生消息對象 MessageExt 獲取更詳細的消息數(shù)據(jù) 。

public void onMessage(MessageExt message) {
try {
String body = new String(message.getBody(), "UTF-8");
logger.info("普通短信:" + message);
} catch (Exception e) {
logger.error("common onMessage error:", e);
}
}

四、源碼概覽

最新源碼中,我們可以看到源碼中包含四個模塊:

1、rocketmq-spring-boot-parent

該模塊是父模塊,定義項目所有依賴的 jar 包。

2、rocketmq-spring-boot

核心模塊,實現(xiàn)了 starter 的核心邏輯。

3、rocketmq-spring-boot-starter

SDK 模塊,簡單封裝,外部項目引用。

4、rocketmq-spring-boot-samples

示例代碼模塊。這個模塊非常重要,當用戶使用 SDK 時,可以參考示例快速開發(fā)。

五、starter 實現(xiàn)

我們重點分析下 rocketmq-spring-boot 模塊的核心源碼:

spring-boot-starter 實現(xiàn)需要包含如下三個部分:

1、定義 Spring 自身的依賴包和 RocketMQ 的依賴包 ;

2、定義spring.factories 文件

在 resources 包下創(chuàng)建 META-INF 目錄后,新建 spring.factories 文件,并在文件中定義自動加載類,文件內(nèi)容是:

org.springframework.boot.autoconfigure.EnableAutoCnotallow=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

spring boot 會根據(jù)文件中配置的自動化配置類來自動初始化相關的 Bean、Component 或 Service。

3、實現(xiàn)自動加載類

在 RocketMQAutoConfiguration 類的具體實現(xiàn)中,我們重點分析下生產(chǎn)者和消費者是如何分別啟動的。

▍生產(chǎn)者發(fā)送模板類:RocketMQTemplate

RocketMQAutoConfiguration 類定義了兩個默認的 Bean :

首先SpringBoot項目中配置文件中的配置值會根據(jù)屬性條件綁定到 RocketMQProperties 對象 中,然后使用 RocketMQ 的原生 API 分別創(chuàng)建生產(chǎn)者 Bean 和拉取消費者 Bean , 分別將兩個 bean 設置到 RocketMQTemplate 對象中。

兩個重點需要強調(diào):

  • 發(fā)送消息時,將 spring-message 規(guī)范下的消息對象封裝成 RocketMQ 消息對象

  • 默認拉取消費者 litePullConsumer 。拉取消費者一般用于大數(shù)據(jù)批量處理場景 。

  • 原生使用方式

RocketMQTemplate 類封裝了拉取消費者的receive方法,以方便開發(fā)者使用。

▍自定義消費者類

下圖是并發(fā)消費者的例子:

消費者示例代碼

那么 rocketmq-spring 是如何自動啟動消費者呢 ?

spring 容器首先注冊了消息監(jiān)聽器后置處理器,然后調(diào)用 ListenerContainerConfiguration 類的 registerContainer 方法 。

對比并發(fā)消費者的例子,我們可以看到:DefaultRocketMQListenerContainer 是對 DefaultMQPushConsumer 消費邏輯的封裝。

封裝消費消息的邏輯,同時滿足 RocketMQListener 泛化接口支持不同參數(shù),比如 String 、MessageExt 、自定義對象 。

首先DefaultRocketMQListenerContainer初始化之后, 獲取 onMessage 方法的參數(shù)類型 。

然后消費者調(diào)用 consumeMessage 處理消息時,封裝了一個 handleMessage 方法 ,將原生 RocketMQ 消息對象 MessageExt 轉(zhuǎn)換成 onMessage 方法定義的參數(shù)對象,然后調(diào)用 rocketMQListener 的 onMessage  方法。

mnjh9

上圖右側標紅的代碼也就是該方法的精髓:

rocketMQListener.onMessage(doConvertMessage(messageExt));

六、寫到最后

開源項目 rocketmq-spring 有很多值得學習的地方 ,我們可以從如下四個層面逐層進階:

1、學會如何使用 :參考 rocketmq-spring-boot-samples 模塊的示例代碼,學會如何發(fā)送和接收消息,快速編碼;

2、模塊設計:學習項目的模塊分層 (父模塊、SDK 模塊、核心實現(xiàn)模塊、示例代碼模塊);

3、starter 設計思路 :定義自動配置文件 spring.factories 、設計配置屬性類 、在 RocketMQ client 的基礎上實現(xiàn)優(yōu)雅的封裝、深入理解 RocketMQ 源碼等;

4、舉一反三:當我們理解了 rocketmq-spring 的源碼,我們可以嘗試模仿該項目寫一個簡單的 spring boot starter。


網(wǎng)站名稱:Rocketmq-Spring:實戰(zhàn)與源碼解析一網(wǎng)打盡
網(wǎng)址分享:http://www.5511xx.com/article/dhciccj.html