日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
RabbitMQ客戶端源碼系列-Connection

前言

本次打算直接上干貨分享 RabbitMQ Java 客戶端一系列的源碼分析 (com.rabbitmq:amqp-client:4.8.3)。

為賽罕等地區(qū)用戶提供了全套網(wǎng)頁(yè)設(shè)計(jì)制作服務(wù),及賽罕網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為網(wǎng)站建設(shè)、成都網(wǎng)站制作、賽罕網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!

ps:最近接收到公司的任務(wù)就是閱讀和分析 spring-rabbit、amqp-client,因此打算一同和大家分享 amqp-client。由于 RabbitMQ 是 Erlang 語(yǔ)言開發(fā)(暫時(shí)沒(méi)有對(duì)這塊分享的計(jì)劃)。

友情提醒:本次分享適合的人群,需要對(duì) RabbitMQ 有一定的了解。

  • RabbitMQ Getstarted: https://www.rabbitmq.com/#getstarted。
  • Java Client API Guide: https://www.rabbitmq.com/api-guide.html。

廢話不多話,開整!

Java Client Connection Demo

我們先看一個(gè)官網(wǎng)提供的 Java Client Connecting to RabbitMQ Demo。

ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
Channel channel = connection.createChannel();
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
channel.close();
connection.close();

AMQP 協(xié)議交互流程

已經(jīng)使用過(guò) RabbitMQ 的同學(xué)相信已經(jīng)不陌生,因此就簡(jiǎn)單的描述下:與 RabbitMQ Broker 建立 Connection 和 Channel,發(fā)送消息后,關(guān)閉 Connection 和 Channel 的過(guò)程。下圖是 針對(duì)這個(gè)過(guò)程使用 Wireshark 抓包查看整個(gè) AMQP 協(xié)議的交互流程(172.30.0.74 為客戶端即本機(jī) ip;192.168.17.160 為 RabbitMQ Broker 的 ip)。

「client 與 broker 創(chuàng)建Connection、Channel、發(fā)送消息」

「client 與 broker 發(fā)送心跳(Heartbeat)、關(guān)閉Connection、Channel」

為了讓讀者更容易看得源碼,我先給大家描述下 client 與 broker 之間 AMQP 協(xié)議的交互流程描述(AMQP 協(xié)議中 不少命令都是成對(duì)存在的,抓包協(xié)議中 Info 里的命令是 -,而代碼里的是 駝峰式 此處以代碼為準(zhǔn)):

  1. 將 AMQP 0-9-1 的連接頭寫入底層套接字,包含指定的版本信息(客戶端告訴 broker 自己使用的協(xié)議及版本,底層使用 java 自帶的 socket)。
  2. 客戶端等待 broker 發(fā)送的 Connection.Start (broker 告訴客戶端 通信的協(xié)議和版本、SASL認(rèn)證機(jī)制(詳細(xì)見)、語(yǔ)言環(huán)境以及RabbitMQ的版本信息和支持能力)。
  3. 客戶端接收后 發(fā)送 Connection.StartOk (客戶端告訴 broker 連接使用的帳號(hào)和密碼、認(rèn)證機(jī)制、語(yǔ)言環(huán)境、客戶的信息以及能力)。
  4. 客戶端等待 broker 發(fā)送的 Connection.Tune (broker 與 客戶端 進(jìn)行參數(shù)協(xié)商)。
  5. 客戶端接收后 發(fā)送 Connection.TuneOk (客戶端 參數(shù) [ChannelMax、FrameMax、Heartbeat] 協(xié)商完成后告訴 broker)。
  6. 客戶端發(fā)送 Connection.Open (客戶端 告訴 broker 打開一個(gè)連接,并請(qǐng)求設(shè)置_virtualHost [vhost])。
  7. broker 接收到后返回 Connection.OpenOk (client 對(duì) vhost 進(jìn)行驗(yàn)證,成功則返回如下此信息)。
  8. 客戶端發(fā)送 Channel.Open,broker 接收到后返回 Channel.OpenOk (客戶端 創(chuàng)建通道;broker 收到并創(chuàng)建通道完成)。
  9. 客戶端發(fā)送 Confirm.Select,broker 接收到后返回 Confirm.SelectOk(客戶端告訴 broker 消息需要使用 confirm的機(jī)制,broker收到并回復(fù))。。
  10. 客戶端發(fā)送消息 Basic.Publish,broker 應(yīng)答返回 Basic.Ack。
  11. 期間 客戶端和 broker 會(huì)相互檢查彼此的心跳 heartbeat。
  12. 客戶端 關(guān)閉通道 Channel.Close,broker 應(yīng)答返回 Channel.CloseOk。
  13. 客戶端 關(guān)閉連接 Connection.Close,broker 應(yīng)答返回 Connection.CloseOk。

源碼分析

熟悉完AMQP 協(xié)議的交互流程易于后續(xù)理解源碼,開始本次主要介紹 Connection 相關(guān)的源碼:ConnectionFactory.newConnection --> AMQConnection.start。

「ConnectionFactory.newConnection()」

public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
throws IOException, TimeoutException {
if(this.metricsCollector == null) {
this.metricsCollector = new NoOpMetricsCollector();
}
// make sure we respect the provided thread factory
// 創(chuàng)建 socketFactory 和 初始化相應(yīng)的配置
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
// 初始化 Connection 涉及到的參數(shù)
ConnectionParams params = params(executor);
// set client-provided via a client property
if (clientProvidedName != null) {
Map properties = new HashMap(params.getClientProperties());
properties.put("connection_name", clientProvidedName);
params.setClientProperties(properties);
}
// 這塊邏輯屬于 rabbit提供自動(dòng)回復(fù)連接的邏輯
if (isAutomaticRecoveryEnabled()) {
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
conn.init();
return conn;
} else {
List
addrs = addressResolver.getAddresses();
Exception lastException = null;
for (Address addr : addrs) {
try {
// 創(chuàng)建、連接 socket 并封裝成 返回 SocketFrameHandler (socket 不采用Negale算法[Negale算法,大家有興趣可以了解下這塊針對(duì)socket緩存性能的優(yōu)化])
FrameHandler handler = fhFactory.create(addr);
// 初始化配置、_channel0、_channelManager等等
AMQConnection conn = createConnection(params, handler, metricsCollector);
// 啟動(dòng) AMQConnection 后續(xù)會(huì)進(jìn)行詳細(xì)介紹
conn.start();
this.metricsCollector.newConnection(conn);
return conn;
} catch (IOException e) {
lastException = e;
} catch (TimeoutException te) {
lastException = te;
}
}
if (lastException != null) {
if (lastException instanceof IOException) {
throw (IOException) lastException;
} else if (lastException instanceof TimeoutException) {
throw (TimeoutException) lastException;
}
}
throw new IOException("failed to connect");
}
}

AMQP 協(xié)議的交互流程中 1~6 的邏輯屬于 AMQConnection.start() 的重點(diǎn)邏輯,也是本次給大家主要介紹的點(diǎn)。

public void start()
throws IOException, TimeoutException {
// 初始化工作線程
initializeConsumerWorkService();
// 初始化心跳發(fā)送
initializeHeartbeatSender();
// 將 Connection標(biāo)志位 啟動(dòng)
this._running = true;
// 確認(rèn)客戶端 第一件事 發(fā)送header頭部協(xié)議
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
new AMQChannel.SimpleBlockingRpcContinuation();
// 進(jìn)入Rpc隊(duì)列進(jìn)行阻塞,等待broker返回 connection.start method
_channel0.enqueueRpc(connStartBlocker);
try {
// The following two lines are akin to AMQChannel's
// transmit() method for this pseudo-RPC.
_frameHandler.setTimeout(handshakeTimeout);
// 1. 發(fā)送header頭部協(xié)議 AMQP 0-9-1
_frameHandler.sendHeader();
} catch (IOException ioe) {
_frameHandler.close();
throw ioe;
}
// 初始化啟動(dòng) startMainLoop -- 為了接收和處理broker發(fā)送的消息
this._frameHandler.initialize(this);

AMQP.Connection.Start connStart;
AMQP.Connection.Tune connTune = null;
try {
// 2. 客戶端等待 broker 發(fā)送的 Connection.Start
connStart =
(AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();

// 通信的協(xié)議和版本、SASL認(rèn)證機(jī)制(詳細(xì)見)、語(yǔ)言環(huán)境以及RabbitMQ的版本信息和支持能力
_serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());

Version serverVersion =
new Version(connStart.getVersionMajor(),
connStart.getVersionMinor());

// 版本比對(duì)
if (!Version.checkVersion(clientVersion, serverVersion)) {
throw new ProtocolVersionMismatchException(clientVersion,
serverVersion);
}

String[] mechanisms = connStart.getMechanisms().toString().split(" ");
SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
if (sm == null) {
throw new IOException("No compatible authentication mechanism found - " +
"server offered [" + connStart.getMechanisms() + "]");
}
String username = credentialsProvider.getUsername();
String password = credentialsProvider.getPassword();
LongString challenge = null;
LongString response = sm.handleChallenge(null, username, password);
do {
// 3. 客戶端接收后 發(fā)送 `Connection.StartOk`
Method method = (challenge == null)
? new AMQP.Connection.StartOk.Builder()
.clientProperties(_clientProperties)
.mechanism(sm.getName())
.response(response)
.build()
: new AMQP.Connection.SecureOk.Builder().response(response).build();
try {
Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
if (serverResponse instanceof AMQP.Connection.Tune) {
// 4. 客戶端等待 broker 發(fā)送的 Connection.Tune
connTune = (AMQP.Connection.Tune) serverResponse;
} else {
challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
response = sm.handleChallenge(challenge, username, password);
}
} catch (ShutdownSignalException e) {
Method shutdownMethod = e.getReason();
if (shutdownMethod instanceof AMQP.Connection.Close) {
AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
throw new AuthenticationFailureException(shutdownClose.getReplyText());
}
}
throw new PossibleAuthenticationFailureException(e);
}
} while (connTune == null);
} catch (TimeoutException te) {
_frameHandler.close();
throw te;
} catch (ShutdownSignalException sse) {
_frameHandler.close();
throw AMQChannel.wrap(sse);
} catch(IOException ioe) {
_frameHandler.close();
throw ioe;
}
try {
// 最大通道數(shù)
int channelMax =
negotiateChannelMax(this.requestedChannelMax,
connTune.getChannelMax());
_channelManager = instantiateChannelManager(channelMax, threadFactory);
// 幀最大的大小
int frameMax =
negotiatedMaxValue(this.requestedFrameMax,
connTune.getFrameMax());
this._frameMax = frameMax;
// 心跳
int heartbeat =
negotiatedMaxValue(this.requestedHeartbeat,
connTune.getHeartbeat());
setHeartbeat(heartbeat);

// 5. 客戶端接收后 發(fā)送 Connection.TuneOk
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
.channelMax(channelMax)
.frameMax(frameMax)
.heartbeat(heartbeat)
.build());
// 6. 客戶端發(fā)送 Channel.Open
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
.virtualHost(_virtualHost)
.build());
} catch (IOException ioe) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw ioe;
} catch (ShutdownSignalException sse) {
_heartbeatSender.shutdown();
_frameHandler.close();
throw AMQChannel.wrap(sse);
}
// We can now respond to errors having finished tailoring the connection
this._inConnectionNegotiation = false;
}

最后

本次分享的目的,先讓讀者對(duì)于 RabbitMQ Client 與 RabbitMQ Broker 根據(jù) AMQP 協(xié)議交互流程有個(gè)大體的認(rèn)識(shí),并根據(jù)分析 Connection 源碼有一定認(rèn)知,其中還有很多 Connection 細(xì)節(jié)源碼需要讀者慢慢體會(huì)。


標(biāo)題名稱:RabbitMQ客戶端源碼系列-Connection
地址分享:http://www.5511xx.com/article/dhecied.html