日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
聊聊Netty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)

本系列Netty源碼解析文章基于 4.1.56.Final版本

10多年的隨州網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開(kāi)發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。營(yíng)銷(xiāo)型網(wǎng)站建設(shè)的優(yōu)勢(shì)是能夠根據(jù)用戶(hù)設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整隨州建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)從事“隨州網(wǎng)站設(shè)計(jì)”,“隨州網(wǎng)站推廣”以來(lái),每個(gè)客戶(hù)項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

在上篇文章??《聊聊Netty那些事兒之從內(nèi)核角度看IO模型》??中我們花了大量的篇幅來(lái)從內(nèi)核角度詳細(xì)講述了五種IO模型的演進(jìn)過(guò)程以及ReactorIO線程模型的底層基石IO多路復(fù)用技術(shù)在內(nèi)核中的實(shí)現(xiàn)原理。

最后我們引出了netty中使用的主從Reactor IO線程模型。

通過(guò)上篇文章的介紹,我們已經(jīng)清楚了在IO調(diào)用的過(guò)程中內(nèi)核幫我們搞了哪些事情,那么俗話說(shuō)的好內(nèi)核領(lǐng)進(jìn)門(mén),修行在netty,netty在用戶(hù)空間又幫我們搞了哪些事情?

那么從本文開(kāi)始,筆者將從源碼角度來(lái)帶大家看下上圖中的Reactor IO線程模型在Netty中是如何實(shí)現(xiàn)的。

本文作為Reactor在Netty中實(shí)現(xiàn)系列文章中的開(kāi)篇文章,筆者先來(lái)為大家介紹Reactor的骨架是如何創(chuàng)建出來(lái)的。

在上篇文章中我們提到Netty采用的是主從Reactor多線程的模型,但是它在實(shí)現(xiàn)上又與Doug Lea在Scalable IO in Java論文中提到的經(jīng)典主從Reactor多線程模型有所差異。

Netty中的Reactor是以Group的形式出現(xiàn)的,主從Reactor在Netty中就是主從Reactor組,每個(gè)Reactor Group中會(huì)有多個(gè)Reactor用來(lái)執(zhí)行具體的IO任務(wù)。當(dāng)然在netty中Reactor不只用來(lái)執(zhí)行IO任務(wù),這個(gè)我們后面再說(shuō)。

  • Main Reactor Group中的Reactor數(shù)量取決于服務(wù)端要監(jiān)聽(tīng)的端口個(gè)數(shù),通常我們的服務(wù)端程序只會(huì)監(jiān)聽(tīng)一個(gè)端口,所以Main Reactor Group只會(huì)有一個(gè)Main Reactor線程來(lái)處理最重要的事情:綁定端口地址,接收客戶(hù)端連接,為客戶(hù)端創(chuàng)建對(duì)應(yīng)的SocketChannel,將客戶(hù)端SocketChannel分配給一個(gè)固定的Sub Reactor。也就是上篇文章筆者為大家舉的例子,飯店最重要的工作就是先把客人迎接進(jìn)來(lái)?!拔壹掖箝T(mén)常打開(kāi),開(kāi)放懷抱等你,擁抱過(guò)就有了默契你會(huì)愛(ài)上這里......”

  • Sub Reactor Group里有多個(gè)Reactor線程,Reactor線程的個(gè)數(shù)可以通過(guò)系統(tǒng)參數(shù)-D io.netty.eventLoopThreads指定。默認(rèn)的Reactor的個(gè)數(shù)為CPU核數(shù) * 2。Sub Reactor線程主要用來(lái)輪詢(xún)客戶(hù)端SocketChannel上的IO就緒事件,處理IO就緒事件,執(zhí)行異步任務(wù)。Sub Reactor Group做的事情就是上篇飯店例子中服務(wù)員的工作,客人進(jìn)來(lái)了要為客人分配座位,端茶送水,做菜上菜。“不管遠(yuǎn)近都是客人,請(qǐng)不用客氣,相約好了在一起,我們歡迎您......”

一個(gè)客戶(hù)端SocketChannel只能分配給一個(gè)固定的Sub Reactor。一個(gè)Sub Reactor負(fù)責(zé)處理多個(gè)客戶(hù)端SocketChannel,這樣可以將服務(wù)端承載的全量客戶(hù)端連接分?jǐn)偟蕉鄠€(gè)Sub Reactor中處理,同時(shí)也能保證客戶(hù)端SocketChannel上的IO處理的線程安全性。

由于文章篇幅的關(guān)系,作為Reactor在netty中實(shí)現(xiàn)的第一篇我們主要來(lái)介紹主從Reactor Group的創(chuàng)建流程,骨架脈絡(luò)先搭好。

下面我們來(lái)看一段Netty服務(wù)端代碼的編寫(xiě)模板,從代碼模板的流程中我們來(lái)解析下主從Reactor的創(chuàng)建流程以及在這個(gè)過(guò)程中所涉及到的Netty核心類(lèi)。

Netty服務(wù)端代碼模板

/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//創(chuàng)建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel類(lèi)型
.option(ChannelOption.SO_BACKLOG, 100)//設(shè)置主Reactor中channel的option選項(xiàng)
.handler(new LoggingHandler(LogLevel.INFO))//設(shè)置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer() {//設(shè)置從Reactor中注冊(cè)channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 綁定端口啟動(dòng)服務(wù),開(kāi)始監(jiān)聽(tīng)accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

首先我們要?jiǎng)?chuàng)建Netty最核心的部分 -> 創(chuàng)建主從Reactor Group,在Netty中EventLoopGroup就是Reactor Group的實(shí)現(xiàn)類(lèi)。對(duì)應(yīng)的EventLoop就是Reactor的實(shí)現(xiàn)類(lèi)。

  //創(chuàng)建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

創(chuàng)建用于IO處理的ChannelHandler,實(shí)現(xiàn)相應(yīng)IO事件的回調(diào)函數(shù),編寫(xiě)對(duì)應(yīng)的IO處理邏輯。注意這里只是簡(jiǎn)單示例哈,詳細(xì)的IO事件處理,筆者會(huì)單獨(dú)開(kāi)一篇文章專(zhuān)門(mén)講述。

final EchoServerHandler serverHandler = new EchoServerHandler();
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
................省略IO處理邏輯................
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {

ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

創(chuàng)建ServerBootstrapNetty服務(wù)端啟動(dòng)類(lèi),并在啟動(dòng)類(lèi)中配置啟動(dòng)Netty服務(wù)端所需要的一些必備信息。

在上篇文章介紹Socket內(nèi)核結(jié)構(gòu)小節(jié)中我們提到,在編寫(xiě)服務(wù)端網(wǎng)絡(luò)程序時(shí),我們首先要?jiǎng)?chuàng)建一個(gè)Socket用于listen和bind端口地址,我們把這個(gè)叫做監(jiān)聽(tīng)Socket,這里對(duì)應(yīng)的就是NioServerSocketChannel.class。當(dāng)客戶(hù)端連接完成三次握手,系統(tǒng)調(diào)用accept函數(shù)會(huì)基于監(jiān)聽(tīng)Socket創(chuàng)建出來(lái)一個(gè)新的Socket專(zhuān)門(mén)用于與客戶(hù)端之間的網(wǎng)絡(luò)通信我們稱(chēng)為客戶(hù)端連接Socket,這里對(duì)應(yīng)的就是NioSocketChannel.class。

netty有兩種Channel類(lèi)型:一種是服務(wù)端用于監(jiān)聽(tīng)綁定端口地址的NioServerSocketChannel,一種是用于客戶(hù)端通信的NioSocketChannel。每種Channel類(lèi)型實(shí)例都會(huì)對(duì)應(yīng)一個(gè)PipeLine用于編排對(duì)應(yīng)channel實(shí)例上的IO事件處理邏輯。PipeLine中組織的就是ChannelHandler用于編寫(xiě)特定的IO處理邏輯。

注意:serverBootstrap.handler設(shè)置的是服務(wù)端NioServerSocketChannel PipeLine中的ChannelHandler。

  • ServerBootstrap啟動(dòng)類(lèi)方法帶有child前綴的均是設(shè)置客戶(hù)端NioSocketChannel屬性的。
  • ChannelInitializer是用于當(dāng)SocketChannel成功注冊(cè)到綁定的Reactor上后,用于初始化該SocketChannel的Pipeline。它的initChannel方法會(huì)在注冊(cè)成功后執(zhí)行。這里只是捎帶提一下,讓大家有個(gè)初步印象,后面我會(huì)專(zhuān)門(mén)介紹。
  • serverBootstrap.childHandler(ChannelHandler childHandler)用于設(shè)置客戶(hù)端NioSocketChannel中對(duì)應(yīng)Pipieline中的ChannelHandler。我們通常配置的編碼解碼器就是在這里。
  • serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)設(shè)置服務(wù)端ServerSocketChannel中的SocketOption。關(guān)于SocketOption的選項(xiàng)我們后邊的文章再聊,本文主要聚焦在Netty Main Reactor Group的創(chuàng)建及工作流程。
  • serverBootstrap.handler(....)設(shè)置服務(wù)端NioServerSocketChannel中對(duì)應(yīng)Pipieline中的ChannelHandler。
  • 通過(guò)serverBootstrap.group(bossGroup, workerGroup)為Netty服務(wù)端配置主從Reactor Group實(shí)例。
  • 通過(guò)serverBootstrap.channel(NioServerSocketChannel.class)配置Netty服務(wù)端的ServerSocketChannel用于綁定端口地址以及創(chuàng)建客戶(hù)端SocketChannel。Netty中的NioServerSocketChannel.class就是對(duì)JDK NIO中ServerSocketChannel的封裝。而用于表示客戶(hù)端連接的NioSocketChannel是對(duì)JDK NIO SocketChannel封裝。

ChannelFuture f = serverBootstrap.bind(PORT).sync()這一步會(huì)是下篇文章要重點(diǎn)分析的主題Main Reactor Group的啟動(dòng),綁定端口地址,開(kāi)始監(jiān)聽(tīng)客戶(hù)端連接事件(OP_ACCEPT)。本文我們只關(guān)注創(chuàng)建流程。

f.channel().closeFuture().sync()等待服務(wù)端NioServerSocketChannel關(guān)閉。Netty服務(wù)端到這里正式啟動(dòng),并準(zhǔn)備好接受客戶(hù)端連接的準(zhǔn)備。

shutdownGracefully優(yōu)雅關(guān)閉主從Reactor線程組里的所有Reactor線程。

Netty對(duì)IO模型的支持

在上篇文章中我們介紹了五種IO模型,Netty中支持BIO,NIO,AIO以及多種操作系統(tǒng)下的IO多路復(fù)用技術(shù)實(shí)現(xiàn)。

在Netty中切換這幾種IO模型也是非常的方便,下面我們來(lái)看下Netty如何對(duì)這幾種IO模型進(jìn)行支持的。

首先我們介紹下幾個(gè)與IO模型相關(guān)的重要接口:

EventLoop

EventLoop就是Netty中的Reactor,可以說(shuō)它就是Netty的引擎,負(fù)責(zé)Channel上IO就緒事件的監(jiān)聽(tīng),IO就緒事件的處理,異步任務(wù)的執(zhí)行驅(qū)動(dòng)著整個(gè)Netty的運(yùn)轉(zhuǎn)。

不同IO模型下,EventLoop有著不同的實(shí)現(xiàn),我們只需要切換不同的實(shí)現(xiàn)類(lèi)就可以完成對(duì)NettyIO模型的切換。

在NIO模型下Netty會(huì)自動(dòng)根據(jù)操作系統(tǒng)以及版本的不同選擇對(duì)應(yīng)的IO多路復(fù)用技術(shù)實(shí)現(xiàn)。比如Linux 2.6版本以上用的是Epoll,2.6版本以下用的是Poll,Mac下采用的是Kqueue。

其中Linux kernel 在5.1版本引入的異步IO庫(kù)io_uring正在netty中孵化。

EventLoopGroup

Netty中的Reactor是以Group的形式出現(xiàn)的,EventLoopGroup正是Reactor組的接口定義,負(fù)責(zé)管理Reactor,Netty中的Channel就是通過(guò)EventLoopGroup注冊(cè)到具體的Reactor上的。

Netty的IO線程模型是主從Reactor多線程模型,主從Reactor線程組在Netty源碼中對(duì)應(yīng)的其實(shí)就是兩個(gè)EventLoopGroup實(shí)例。

不同的IO模型也有對(duì)應(yīng)的實(shí)現(xiàn):

ServerSocketChannel

用于Netty服務(wù)端使用的ServerSocketChannel,對(duì)應(yīng)于上篇文章提到的監(jiān)聽(tīng)Socket,負(fù)責(zé)綁定監(jiān)聽(tīng)端口地址,接收客戶(hù)端連接并創(chuàng)建用于與客戶(hù)端通信的SocketChannel。

不同的IO模型下的實(shí)現(xiàn):

SocketChannel

用于與客戶(hù)端通信的SocketChannel,對(duì)應(yīng)于上篇文章提到的客戶(hù)端連接Socket,當(dāng)客戶(hù)端完成三次握手后,由系統(tǒng)調(diào)用accept函數(shù)根據(jù)監(jiān)聽(tīng)Socket創(chuàng)建。

不同的IO模型下的實(shí)現(xiàn):

我們看到在不同IO模型的實(shí)現(xiàn)中,Netty這些圍繞IO模型的核心類(lèi)只是前綴的不同:

  • BIO對(duì)應(yīng)的前綴為Oio表示old io,現(xiàn)在已經(jīng)廢棄不推薦使用。
  • NIO對(duì)應(yīng)的前綴為Nio,正是Netty推薦也是我們常用的非阻塞IO模型。
  • AIO對(duì)應(yīng)的前綴為Aio,由于Linux下的異步IO機(jī)制實(shí)現(xiàn)的并不成熟,性能提升表現(xiàn)上也不明顯,現(xiàn)已被刪除。

我們只需要將IO模型的這些核心接口對(duì)應(yīng)的實(shí)現(xiàn)類(lèi)前綴改為對(duì)應(yīng)IO模型的前綴,就可以輕松在Netty中完成對(duì)IO模型的切換。

多種NIO的實(shí)現(xiàn)

我們通常在使用NIO模型的時(shí)候會(huì)使用Common列下的這些IO模型核心類(lèi),Common類(lèi)也會(huì)根據(jù)操作系統(tǒng)的不同自動(dòng)選擇JDK在對(duì)應(yīng)平臺(tái)下的IO多路復(fù)用技術(shù)的實(shí)現(xiàn)。

而Netty自身也根據(jù)操作系統(tǒng)的不同提供了自己對(duì)IO多路復(fù)用技術(shù)的實(shí)現(xiàn),比JDK的實(shí)現(xiàn)性能更優(yōu)。比如:

  • JDK的 NIO 默認(rèn)實(shí)現(xiàn)是水平觸發(fā),Netty 是邊緣觸發(fā)(默認(rèn))和水平觸發(fā)可切換。
  • Netty 實(shí)現(xiàn)的垃圾回收更少、性能更好。

我們編寫(xiě)Netty服務(wù)端程序的時(shí)候也可以根據(jù)操作系統(tǒng)的不同,采用Netty自身的實(shí)現(xiàn)來(lái)進(jìn)一步優(yōu)化程序。做法也很簡(jiǎn)單,直接將上圖中紅框里的實(shí)現(xiàn)類(lèi)替換成Netty的自身實(shí)現(xiàn)類(lèi)即可完成切換。

經(jīng)過(guò)以上對(duì)Netty服務(wù)端代碼編寫(xiě)模板以及IO模型相關(guān)核心類(lèi)的簡(jiǎn)單介紹,我們對(duì)Netty的創(chuàng)建流程有了一個(gè)簡(jiǎn)單粗略的總體認(rèn)識(shí),下面我們來(lái)深入剖析下創(chuàng)建流程過(guò)程中的每一個(gè)步驟以及這個(gè)過(guò)程中涉及到的核心類(lèi)實(shí)現(xiàn)。

以下源碼解析部分我們均采用Common列下NIO相關(guān)的實(shí)現(xiàn)進(jìn)行解析。

創(chuàng)建主從Reactor線程組

在Netty服務(wù)端程序編寫(xiě)模板的開(kāi)始,我們首先會(huì)創(chuàng)建兩個(gè)Reactor線程組:

  • 一個(gè)是主Reactor線程組bossGroup用于監(jiān)聽(tīng)客戶(hù)端連接,創(chuàng)建客戶(hù)端連接NioSocketChannel,并將創(chuàng)建好的客戶(hù)端連接NioSocketChannel注冊(cè)到從Reactor線程組中一個(gè)固定的Reactor上。
  • 一個(gè)是從Reactor線程組workerGroup,workerGroup中的Reactor負(fù)責(zé)監(jiān)聽(tīng)綁定在其上的客戶(hù)端連接NioSocketChannel上的IO就緒事件,并處理IO就緒事件,執(zhí)行異步任務(wù)。
  //創(chuàng)建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

Netty中Reactor線程組的實(shí)現(xiàn)類(lèi)為NioEventLoopGroup,在創(chuàng)建bossGroup和workerGroup的時(shí)候用到了NioEventLoopGroup的兩個(gè)構(gòu)造函數(shù):

  • 帶nThreads參數(shù)的構(gòu)造函數(shù)public NioEventLoopGroup(int nThreads)。
  • 不帶nThreads參數(shù)的默認(rèn)構(gòu)造函數(shù)public NioEventLoopGroup()。
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads, the default {@link ThreadFactory} and
* the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
......................省略...........................
}

nThreads參數(shù)表示當(dāng)前要?jiǎng)?chuàng)建的Reactor線程組內(nèi)包含多少個(gè)Reactor線程。不指定nThreads參數(shù)的話采用默認(rèn)的Reactor線程個(gè)數(shù),用0表示。

最終會(huì)調(diào)用到構(gòu)造函數(shù)。

  public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

下面簡(jiǎn)單介紹下構(gòu)造函數(shù)中這幾個(gè)參數(shù)的作用,后面我們?cè)谥v解本文主線的過(guò)程中還會(huì)提及這幾個(gè)參數(shù),到時(shí)在詳細(xì)介紹,這里只是讓大家有個(gè)初步印象,不必做過(guò)多的糾纏。

  • Executor executor:負(fù)責(zé)啟動(dòng)Reactor線程進(jìn)而Reactor才可以開(kāi)始工作。

Reactor線程組NioEventLoopGroup負(fù)責(zé)創(chuàng)建Reactor線程,在創(chuàng)建的時(shí)候會(huì)將executor傳入。

  • RejectedExecutionHandler: 當(dāng)向Reactor添加異步任務(wù)添加失敗時(shí),采用的拒絕策略。Reactor的任務(wù)不只是監(jiān)聽(tīng)I(yíng)O活躍事件和IO任務(wù)的處理,還包括對(duì)異步任務(wù)的處理。這里大家只需有個(gè)這樣的概念,后面筆者會(huì)專(zhuān)門(mén)詳細(xì)介紹。
  • SelectorProvider selectorProvider:Reactor中的IO模型為IO多路復(fù)用模型,對(duì)應(yīng)于JDK NIO中的實(shí)現(xiàn)為java.nio.channels.Selector(就是我們上篇文章中提到的select,poll,epoll),每個(gè)Reator中都包含一個(gè)Selector,用于輪詢(xún)注冊(cè)在該Reactor上的所有Channel上的IO事件。SelectorProvider就是用來(lái)創(chuàng)建Selector的。
  • SelectStrategyFactory selectStrategyFactory: Reactor最重要的事情就是輪詢(xún)注冊(cè)其上的Channel上的IO就緒事件,這里的SelectStrategyFactory用于指定輪詢(xún)策略,默認(rèn)為DefaultSelectStrategyFactory.INSTANCE。

最終會(huì)將這些參數(shù)交給NioEventLoopGroup的父類(lèi)構(gòu)造器,下面我們來(lái)看下NioEventLoopGroup類(lèi)的繼承結(jié)構(gòu):

NioEventLoopGroup類(lèi)的繼承結(jié)構(gòu)乍一看比較復(fù)雜,大家不要慌,筆者會(huì)隨著主線的深入慢慢地介紹這些父類(lèi)接口,我們現(xiàn)在重點(diǎn)關(guān)注Mutithread前綴的類(lèi)。

我們知道NioEventLoopGroup是Netty中的Reactor線程組的實(shí)現(xiàn),既然是線程組那么肯定是負(fù)責(zé)管理和創(chuàng)建多個(gè)Reactor線程的,所以Mutithread前綴的類(lèi)定義的行為自然是對(duì)Reactor線程組內(nèi)多個(gè)Reactor線程的創(chuàng)建和管理工作。

MultithreadEventLoopGroup

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
//默認(rèn)Reactor個(gè)數(shù)
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...................省略.....................
}

MultithreadEventLoopGroup類(lèi)主要的功能就是用來(lái)確定Reactor線程組內(nèi)Reactor的個(gè)數(shù)。

默認(rèn)的Reactor的個(gè)數(shù)存放于字段DEFAULT_EVENT_LOOP_THREADS中。

從static {}靜態(tài)代碼塊中我們可以看出默認(rèn)Reactor的個(gè)數(shù)的獲取邏輯:

  • 可以通過(guò)系統(tǒng)變量 -D io.netty.eventLoopThreads"指定。
  • 如果不指定,那么默認(rèn)的就是NettyRuntime.availableProcessors() * 2。

當(dāng)nThread參數(shù)設(shè)置為0采用默認(rèn)設(shè)置時(shí),Reactor線程組內(nèi)的Reactor個(gè)數(shù)則設(shè)置為DEFAULT_EVENT_LOOP_THREADS。

MultithreadEventExecutorGroup

MultithreadEventExecutorGroup這里就是本小節(jié)的核心,主要用來(lái)定義創(chuàng)建和管理Reactor的行為。

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//Reactor線程組中的Reactor集合
private final EventExecutor[] children;
private final Set readonlyChildren;
//從Reactor group中選擇一個(gè)特定的Reactor的選擇策略 用于channel注冊(cè)綁定到一個(gè)固定的Reactor上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
............................省略................................
}

首先介紹一個(gè)新的構(gòu)造器參數(shù)EventExecutorChooserFactory chooserFactory。當(dāng)客戶(hù)端連接完成三次握手后,Main Reactor會(huì)創(chuàng)建客戶(hù)端連接NioSocketChannel,并將其綁定到Sub Reactor Group中的一個(gè)固定Reactor,那么具體要綁定到哪個(gè)具體的Sub Reactor上呢?這個(gè)綁定策略就是由chooserFactory來(lái)創(chuàng)建的。默認(rèn)為DefaultEventExecutorChooserFactory。

下面就是本小節(jié)的主題Reactor線程組的創(chuàng)建過(guò)程:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
//用于創(chuàng)建Reactor線程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
//循環(huán)創(chuàng)建reaactor group中的Reactor
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//創(chuàng)建reactor
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
................省略................
}
}
}
//創(chuàng)建channel到Reactor的綁定策略
chooser = chooserFactory.newChooser(children);

................省略................

Set childrenSet = new LinkedHashSet(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

1、 創(chuàng)建用于啟動(dòng)Reactor線程的executor

在Netty Reactor Group中的單個(gè)Reactor的IO線程模型為上篇文章提到的單Reactor單線程模型,一個(gè)Reactor線程負(fù)責(zé)輪詢(xún)注冊(cè)其上的所有Channel中的IO就緒事件,處理IO事件,執(zhí)行Netty中的異步任務(wù)等工作。正是這個(gè)Reactor線程驅(qū)動(dòng)著整個(gè)Netty的運(yùn)轉(zhuǎn),可謂是Netty的核心引擎。

而這里的executor就是負(fù)責(zé)啟動(dòng)Reactor線程的,從創(chuàng)建源碼中我們可以看到executor的類(lèi)型為T(mén)hreadPerTaskExecutor。

ThreadPerTaskExecutor

public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

我們看到ThreadPerTaskExecutor做的事情很簡(jiǎn)單,從它的命名前綴ThreadPerTask我們就可以猜出它的工作方式,就是來(lái)一個(gè)任務(wù)就創(chuàng)建一個(gè)線程執(zhí)行。而創(chuàng)建的這個(gè)線程正是netty的核心引擎Reactor線程。

在Reactor線程啟動(dòng)的時(shí)候,Netty會(huì)將Reactor線程要做的事情封裝成Runnable,丟給exexutor啟動(dòng)。

而Reactor線程的核心就是一個(gè)死循環(huán)不停的輪詢(xún)IO就緒事件,處理IO事件,執(zhí)行異步任務(wù)。一刻也不停歇,堪稱(chēng)996典范。

這里向大家先賣(mài)個(gè)關(guān)子,"Reactor線程是何時(shí)啟動(dòng)的呢??"

2、 創(chuàng)建ReactorReactor

線程組NioEventLoopGroup包含多個(gè)Reactor,存放于private final EventExecutor[] children數(shù)組中。

所以下面的事情就是創(chuàng)建nThread個(gè)Reactor,并存放于EventExecutor[] children字段中,

我們來(lái)看下用于創(chuàng)建Reactor的newChild(executor, args)方法:

newChild

newChild方法是MultithreadEventExecutorGroup中的一個(gè)抽象方法,提供給具體子類(lèi)實(shí)現(xiàn)。

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

這里我們解析的是NioEventLoopGroup,我們來(lái)看下newChild在該類(lèi)中的實(shí)現(xiàn):

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}

前邊提到的眾多構(gòu)造器參數(shù),這里會(huì)通過(guò)可變參數(shù)Object... args傳入到Reactor類(lèi)NioEventLoop的構(gòu)造器中。

這里介紹下新的參數(shù)EventLoopTaskQueueFactory queueFactory,前邊提到Netty中的Reactor主要工作是輪詢(xún)注冊(cè)其上的所有Channel上的IO就緒事件,處理IO就緒事件。除了這些主要的工作外,Netty為了極致的壓榨Reactor的性能,還會(huì)讓它做一些異步任務(wù)的執(zhí)行工作。既然要執(zhí)行異步任務(wù),那么Reactor中就需要一個(gè)隊(duì)列來(lái)保存任務(wù)。

這里的EventLoopTaskQueueFactory就是用來(lái)創(chuàng)建這樣的一個(gè)隊(duì)列來(lái)保存Reactor中待執(zhí)行的異步任務(wù)。

可以把Reactor理解成為一個(gè)單線程的線程池,類(lèi)似于JDK中的SingleThreadExecutor,僅用一個(gè)線程來(lái)執(zhí)行輪詢(xún)IO就緒事件,處理IO就緒事件,執(zhí)行異步任務(wù)。同時(shí)待執(zhí)行的異步任務(wù)保存在Reactor里的taskQueue中。

NioEventLoop

public final class NioEventLoop extends SingleThreadEventLoop {
//用于創(chuàng)建JDK NIO Selector,ServerSocketChannel
private final SelectorProvider provider;
//Selector輪詢(xún)策略 決定什么時(shí)候輪詢(xún),什么時(shí)候處理IO事件,什么時(shí)候執(zhí)行異步任務(wù)
private final SelectStrategy selectStrategy;
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
}

這里就正式開(kāi)始了Reactor的創(chuàng)建過(guò)程,我們知道Reactor的核心是采用的IO多路復(fù)用模型來(lái)對(duì)客戶(hù)端連接上的IO事件進(jìn)行監(jiān)聽(tīng),所以最重要的事情是創(chuàng)建Selector(JDK NIO 中IO多路復(fù)用技術(shù)的實(shí)現(xiàn))。

可以把Selector理解為我們上篇文章介紹的Select,poll,epoll,它是JDK NIO對(duì)操作系統(tǒng)內(nèi)核提供的這些IO多路復(fù)用技術(shù)的封裝。

openSelector

openSelector是NioEventLoop類(lèi)中用于創(chuàng)建IO多路復(fù)用的Selector,并對(duì)創(chuàng)建出來(lái)的JDK NIO 原生的Selector進(jìn)行性能優(yōu)化。

首先會(huì)通過(guò)SelectorProvider#openSelector創(chuàng)建JDK NIO原生的Selector。

 private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
//通過(guò)JDK NIO SelectorProvider創(chuàng)建Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

..................省略.............
}

SelectorProvider會(huì)根據(jù)操作系統(tǒng)的不同選擇JDK在不同操作系統(tǒng)版本下的對(duì)應(yīng)Selector的實(shí)現(xiàn)。Linux下會(huì)選擇Epoll,Mac下會(huì)選擇Kqueue。

下面我們就來(lái)看下SelectorProvider是如何做到自動(dòng)適配不同操作系統(tǒng)下IO多路復(fù)用實(shí)現(xiàn)的。

SelectorProvider

  public NioEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, SelectorProvider.provider());
}

SelectorProvider是在前面介紹的NioEventLoopGroup類(lèi)構(gòu)造函數(shù)中通過(guò)調(diào)用SelectorProvider.provider()被加載,并通過(guò)NioEventLoopGroup#newChild方法中的可變長(zhǎng)參數(shù)Object... args傳遞到NioEventLoop中的private final SelectorProvider provider字段中。

SelectorProvider的加載過(guò)程:

public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}

從SelectorProvider加載源碼中我們可以看出,SelectorProvider的加載方式有三種,優(yōu)先級(jí)如下:

通過(guò)系統(tǒng)變量-D java.nio.channels.spi.SelectorProvider指定SelectorProvider的自定義實(shí)現(xiàn)類(lèi)全限定名。通過(guò)應(yīng)用程序類(lèi)加載器(Application Classloader)加載。

通過(guò)SPI方式加載。在工程目錄META-INF/services下定義名為java.nio.channels.spi.SelectorProvider的SPI文件,文件中第一個(gè)定義的SelectorProvider實(shí)現(xiàn)類(lèi)全限定名就會(huì)被加載。

  private static boolean loadProviderAsService() {
ServiceLoader sl =
ServiceLoader.load(
新聞標(biāo)題:聊聊Netty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)
網(wǎng)站地址:http://www.5511xx.com/article/dpisddo.html