日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
SpringCloudStream使用詳解及部分重點(diǎn)源碼分析

環(huán)境:Springboot2.3.12.RELEASE + Spring Cloud Hoxton.SR12 + RabbitMQ3.8.12

專注于為中小企業(yè)提供網(wǎng)站制作、網(wǎng)站設(shè)計(jì)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)汕城免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上1000家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

簡介

Spring Cloud Stream是一個(gè)框架,用于構(gòu)建與MQ連接的高度可伸縮的事件驅(qū)動(dòng)微服務(wù)。其目的是為了簡化消息在 Spring Cloud 應(yīng)用程序中的開發(fā)。屏蔽了各種MQ之間的差異,使得在更換MQ的時(shí)候不需要修改代碼。

Spring Cloud Stream支持多種綁定器實(shí)現(xiàn),如下:

  • RabbitMQ。
  • Apache Kafka。
  • Kafka Streams。
  • Amazon Kinesis。
  • Google PubSub (partner maintained)。
  • Solace PubSub+ (partner maintained)。
  • Azure Event Hubs (partner maintained)。
  • AWS SQS (partner maintained)。
  • AWS SNS (partner maintained)。
  • Apache RocketMQ (partner maintained)。

詳細(xì)查看官方文檔,對(duì)應(yīng)每一個(gè)MQ都有一個(gè)Github地址。

Spring Cloud Stream的核心構(gòu)建塊是:

  • 目標(biāo)綁定器(Destination Binders):負(fù)責(zé)與MQ集成的組件。
  • 目標(biāo)綁定(Destination Bindings):MQ中間件與最終用戶提供的應(yīng)用程序代碼(生產(chǎn)者/消費(fèi)者)之間的橋梁。
  • 消息(Message):生產(chǎn)者和消費(fèi)者用來與目標(biāo)綁定器(以及通過MQ與其他應(yīng)用程序)通信的規(guī)范數(shù)據(jù)結(jié)構(gòu)。

Stream 核心組件關(guān)系圖

快速入門

依賴:


Hoxton.SR12



org.springframework.boot
spring-boot-starter-amqp


org.springframework.cloud
spring-cloud-starter-stream-rabbit





org.springframework.cloud
spring-cloud-dependencies
${spring-cloud.version}
pom
import


應(yīng)用配置:

spring:
rabbitmq:
host: localhost
virtual-host: bus
port: 5672
username: xxx
password: xxx
---
spring:
cloud:
stream:
bindings:
#自定義輸入輸出
myInput:
#指定輸入通道對(duì)應(yīng)的主題名
destination: demo
myOutput:
destination: demo

創(chuàng)建消息通道綁定的接口:

public interface StreamBinding { 
String INPUT = "myInput";
String OUTPUT = "myOutput";
@Input(StreamBinding.INPUT)
SubscribableChannel input();
@Output(StreamBinding.OUTPUT)
MessageChannel output();
}

通過 @Input和 @Output注解定義輸入通道和輸出通道名稱,這里的名稱與上面配置文件中的是對(duì)應(yīng)的。

當(dāng)定義輸出通道的時(shí)候,需要返回 MessageChannel 接口對(duì)象,該接口定義了向消息通道發(fā)送消息的方法;定義輸入通道時(shí),需要返回 SubscribableChannel 接口對(duì)象,該接口集成自 MessageChannel 接口,它定義了維護(hù)消息通道訂閱者的方法。

這里的Input,Output兩個(gè)方法容器會(huì)分別創(chuàng)建一個(gè)Bean對(duì)象。

創(chuàng)建消費(fèi)者:

@Component
@EnableBinding(value = {StreamBinding.class})
public class StreamReceiver {
private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);
@StreamListener(StreamBinding.INPUT)
public void receive(String message) {
logger.info("接收到消息: {}", message);
}
}

@EnableBinding 注解用來指定一個(gè)或多個(gè)定義了 @Input 或 @Output 注解的接口,以此實(shí)現(xiàn)對(duì)消息通道(Channel)的綁定。上面我們通過 @EnableBinding(value = {StreamClient.class}) 綁定了 StreamClient 接口,該接口是我們自己實(shí)現(xiàn)的對(duì)輸入輸出消息通道綁定的定義。

@StreamListener,主要定義在方法上,作用是將被修飾的方法注冊(cè)為消息中間件上數(shù)據(jù)流的事件監(jiān)聽器,注解中的屬性值對(duì)應(yīng)了監(jiān)聽的消息通道名。上面我們將 receive 方法注冊(cè)為 myInput 消息通道的監(jiān)聽處理器,當(dāng)我們往這個(gè)消息通道發(fā)送信息的時(shí)候,receiver 方法會(huì)執(zhí)行。

消息發(fā)送接口:

@Resource
private StreamBinding streamBinding;
@GetMapping("/send")
public void send() {
streamBinding.output().send(MessageBuilder.withPayload("First Message...").build());
}

啟動(dòng)服務(wù):

查看RabbitMQ

自動(dòng)為我們創(chuàng)建了一個(gè)隊(duì)列,隊(duì)列的名稱是以我們?cè)谂渲梦募信渲玫拈_頭,后面是隨機(jī)生成的。這個(gè)隊(duì)列會(huì)自動(dòng)刪除AD,服務(wù)關(guān)閉后就自動(dòng)刪除隊(duì)列;Excl:排他的,存在該隊(duì)列就不會(huì)在創(chuàng)建了。

修改端口后,再啟動(dòng)一個(gè)服務(wù):

創(chuàng)建了2個(gè)隊(duì)列,使用其中一個(gè)發(fā)送消息:

兩個(gè)服務(wù)都收到了消息。

消費(fèi)者組

上面啟動(dòng)了2個(gè)服務(wù)都能收到消息,在集群的環(huán)境下這樣肯定會(huì)帶來問題,如果是業(yè)務(wù)方面的就會(huì)出現(xiàn)重復(fù)數(shù)據(jù),這時(shí)候我們可以通過設(shè)置分組的解決此問題。修改配置:

spring:
cloud:
stream:
bindings:
myInput:
#指定輸入通道對(duì)應(yīng)的主題名
destination: demo
#指定一個(gè)組;指定分組以后,不管你啟動(dòng)多少個(gè)實(shí)例,所有的實(shí)例都監(jiān)聽這一個(gè)隊(duì)列
#多個(gè)實(shí)例會(huì)輪詢的接收消息
group: g_test
myOutput:
destination: demo

再次啟動(dòng)服務(wù)后,兩個(gè)服務(wù)會(huì)輪詢的接收到消息。

啟動(dòng)服務(wù)后,兩個(gè)服務(wù)都同時(shí)監(jiān)聽同一個(gè)隊(duì)列。隊(duì)列也不是隨機(jī)生成的了,并且隊(duì)列是持久化的,服務(wù)斷開后隊(duì)列也不會(huì)自動(dòng)刪除。

消息分區(qū)

通過消費(fèi)組的設(shè)置,雖然能保證同一消息只被一個(gè)消費(fèi)者進(jìn)行接收和處理,但是對(duì)于特殊業(yè)務(wù)情況,除了要保證單一實(shí)例消費(fèi)之外,還希望那些具備相同特征的消息都能被同一個(gè)實(shí)例消費(fèi),這個(gè)就可以使用 Spring Cloud Stream 提供的消息分區(qū)功能。修改配置。

spring:
cloud:
stream:
bindings:
myInput:
#指定輸入通道對(duì)應(yīng)的主題名
destination: demo
#指定一個(gè)組;指定分組以后,不管你啟動(dòng)多少個(gè)實(shí)例,所有的實(shí)例都監(jiān)聽這一個(gè)隊(duì)列
#多個(gè)實(shí)例會(huì)輪詢的接收消息
group: g_test
consumer:
#通過該參數(shù)開啟消費(fèi)者分區(qū)功能
partitioned: true
myOutput:
destination: demo
producer:
#這里的配置也可以是SpEL表達(dá)式,比如:headers['partition']通過消息header獲取屬性
#這里會(huì)通過表達(dá)式及消息對(duì)象進(jìn)行計(jì)算得到一個(gè)Key,然后獲取key的hashCode
# 得到hashCode以后會(huì)與partitionCount進(jìn)行取模運(yùn)算得到具體的分區(qū)
partitionKeyExpression: '1' #我這里給的值就是對(duì)應(yīng)的instanceIndex的值,你希望誰接收就設(shè)置誰配置的值即可
partitionCount: 2
#實(shí)例總數(shù)
instanceCount: 2
#該參數(shù)設(shè)置了當(dāng)前實(shí)例的索引號(hào),從 0 開始
instanceIndex: 0

計(jì)算分區(qū)源碼:

最后得到分區(qū)信息后會(huì)在消息頭中放入一個(gè)scst_partition為key,partition為值的頭信息。

啟動(dòng)多個(gè)實(shí)例后,測(cè)試發(fā)現(xiàn)所有的消息都只是同一個(gè)實(shí)例收到消息。

交換機(jī)分別與每一個(gè)服務(wù)進(jìn)行綁定使用不同的Routing Key這樣在發(fā)送消息的時(shí)候就可以根據(jù)計(jì)算處理的分區(qū)進(jìn)行定向發(fā)送消息了。

通過源碼查看:

這里通過我們的配置交換機(jī)為demo。接著是獲取路由key了。

這里會(huì)從消息header中獲取key = scst_partition的頭信息。

這樣針對(duì)使用RabbitMQ的中間件發(fā)送消息所需要的交換機(jī)及路由key就確定下來了。


當(dāng)前文章:SpringCloudStream使用詳解及部分重點(diǎn)源碼分析
URL地址:http://www.5511xx.com/article/djgigoe.html