新聞中心
TCP 協(xié)議的核心概念
要了解服務器的工作原理首先需要了解 TCP 協(xié)議的工作原理。TCP 是一種面向連接的、可靠的、基于字節(jié)流的傳輸層全雙工通信協(xié)議,它有 4 個特點:面向連接、可靠、流式、全雙工。下面詳細講解這些特性。

面向連接
TCP 中的連接是一個虛擬的連接,本質上是主機在內存里記錄了對端的信息,我們可以將連接理解為一個通信的憑證。如下圖所示。
那么如何建立連接呢?TCP 的連接是通過三次握手建立的。
服務器首先需要監(jiān)聽一個端口。
客戶端主動往服務器監(jiān)聽的端口發(fā)起一個 syn 包(第一次握手)。
當服務器所在操作系統(tǒng)收到一個 syn 包時,會先根據 syn 包里的目的 IP 和端口找到對應的監(jiān)聽 socket,如果找不到則回復 rst 包,如果找到則發(fā)送 ack 給客戶端(第二次握手),接著新建一個通信 socket 并插入到監(jiān)聽 socket 的連接中隊列(具體的細節(jié)會隨著不同版本的操作系統(tǒng)而變化。比如連接中隊列和連接完成隊列是一條隊列還是兩條隊列,再比如是否使用了 syn cookie 技術來防止 syn flood 攻擊,如果使用了,收到 syn 包的時候就不會創(chuàng)建 socket,而是收到第三次握手的包時再創(chuàng)建)。
客戶端收到服務器的 ack 后,再次發(fā)送 ack 給服務器,客戶端就完成三次握手進入連接建立狀態(tài)了。
當服務器所在操作系統(tǒng)收到客戶端的 ack 時(第三次握手),處于連接中隊列的 socket 就會被移到連接完成隊列中。
當操作系統(tǒng)完成了一個 TCP 連接,操作系統(tǒng)就會通知相應的進程,進程從連接完成隊列中摘下一個已完成連接的 socket 結點,然后生成一個新的 fd,后續(xù)就可以在該 fd 上和對端通信。具體的流程如下圖所示。
完成三次握手后,客戶端和服務器就可以進行數(shù)據通信了。操作系統(tǒng)收到數(shù)據包和收到 syn 包的流程不一樣,操作系統(tǒng)會根據報文中的 IP 和端口找到處理該報文的通信 socket(而不是監(jiān)聽 socket),然后把數(shù)據包(操作系統(tǒng)實現(xiàn)中是一個 skb 結構體)掛到該通信 socket 的數(shù)據隊列中。
當應用層調用 read 讀取該 socket 的數(shù)據時,操作系統(tǒng)會根據應用層所需大小,從一個或多個 skb 中返回對應的字節(jié)數(shù)。同樣,寫也是類似的流程,當應用層往 socket 寫入數(shù)據時,操作系統(tǒng)不一定會立刻發(fā)送出去,而是會保存到寫緩沖區(qū)中,然后根據復雜的 TCP 算法發(fā)送。
當兩端完成通信后需要關閉連接,否則會浪費內存。TCP 通過四次揮手實現(xiàn)連接的斷開,第一次揮手可以由任意一端發(fā)起。前面講過 TCP 是全雙工的,所以除了通過四次揮手完成整個 TCP 連接的斷開外,也可以實現(xiàn)半斷開,比如客戶端關閉寫端表示不會再發(fā)送數(shù)據,但是仍然可以讀取來自對端發(fā)送端數(shù)據。四次揮手的流程如下。
可靠
TCP 發(fā)送數(shù)據時會先緩存一份到已發(fā)送待確認隊列中,并啟動一個超時重傳計時器,如果一定時間內沒有收到對端到確認 ack,則觸發(fā)重傳機制,直到收到 ack 或者重傳次數(shù)達到閾值才會結束流程。
流式
建立連接后,應用層就可以調用發(fā)送接口源源不斷地發(fā)送數(shù)據。通常情況下,并不是每次調用發(fā)送接口,操作系統(tǒng)就直接把數(shù)據發(fā)送出去,這些數(shù)據的發(fā)送是由操作系統(tǒng)按照一定的算法去發(fā)送的。對操作系統(tǒng)來說,它看到的是字節(jié)流,它會按照 TCP 算法打包出一個個包發(fā)送到對端,所以當對端收到數(shù)據后,需要處理好數(shù)據邊界的問題。
從上圖中可以看到,假設應用層發(fā)送了兩個 HTTP 請求,操作系統(tǒng)在打包數(shù)據發(fā)送時可能的場景是第一個包里包括了 HTTP 請求 1 的全部數(shù)據和部分請求 2 的數(shù)據,所以當對端收到數(shù)據并進行解析時,就需要根據 HTTP 協(xié)議準確地解析出第一個 HTTP 請求對應的數(shù)據。
因為 TCP 的流式協(xié)議,所以基于 TCP 的應用層通常需要定義一個應用層協(xié)議,然后按照應用層協(xié)議實現(xiàn)對應的解析器,這樣才能完成有效的數(shù)據通信,比如常用的 HTTP 協(xié)議。對比來說 UDP 是面向數(shù)據包的協(xié)議,當應用層把數(shù)據傳遞給 UDP 時,操作系統(tǒng)會直接打包發(fā)送出去(如果數(shù)據字節(jié)大小超過閾值則會報錯)。
全雙工
剛才提到 TCP 是全雙工的,全雙工就是通信的兩端都有一個發(fā)送隊列和接收隊列,可以同時發(fā)送和接收,互不影響。另外也可以選擇關閉讀端或者寫端。
服務器的工作原理
介紹了 TCP 協(xié)議的概念后,接著看看如何創(chuàng)建一個 TCP 服務器(偽代碼)。
// 創(chuàng)建一個 socket,拿到一個文件描述符
int server_fd = socket();
// 綁定地址(IP + 端口)到該 socket 中
bind(server_fd, addressInfo);
// 修改 socket 為監(jiān)聽狀態(tài),這樣就可以接收 TCP 連接了
listen(server_fd);執(zhí)行完以上步驟,一個服務器就啟動了。服務器啟動的時候會監(jiān)聽一個端口,如果有連接到來,我們可以通過 accept 系統(tǒng)調用拿到這個新連接對應的 socket,那這個 socket 和監(jiān)聽的 socket 是不是同一個呢?
其實 socket 分為監(jiān)聽型和通信型。表面上服務器用一個端口實現(xiàn)了多個連接,但是這個端口是用于監(jiān)聽的,底層用于和客戶端通信的其實是另一個 socket。每當一個連接到來的時候,操作系統(tǒng)會根據請求包的目的地址信息找到對應的監(jiān)聽 socket,如果找不到就會回復 RST 包,如果找到就會生成一個新的 socket 與之通信(accept 的時候返回的那個)。監(jiān)聽 socket 里保存了監(jiān)聽的 IP 和端口,通信 socket 首先從監(jiān)聽 socket 中復制 IP 和端口,然后把客戶端的 IP 和端口也記錄下來。這樣一來,下次再收到一個數(shù)據包,操作系統(tǒng)就會根據四元組從 socket 池子里找到該 socket,完成數(shù)據的處理。因此理論上,一個服務器能接受多少連接取決于服務器的硬件配置,比如內存大小。
接下來分析各種處理連接的方式。
串行模式
串行模式就是服務器逐個處理連接,處理完前面的連接后才能繼續(xù)處理后面的連接,邏輯如下。
while(1) {
int client_fd = accept(server_fd);
read(client_fd);
write(client_fd);
}上面的處理方式是最樸素的模型,如果沒有連接,則服務器處于阻塞狀態(tài),如果有連接服務器就會不斷地調用 accept 摘下完成三次握手的連接并處理。假設此時有 n 個請求到來,進程會從 accept 中被喚醒,然后拿到一個新的 socket 用于通信,結構圖如下。
這種處理模式下,如果處理的過程中調用了阻塞 API,比如文件 IO,就會影響后面請求的處理,可想而知,效率是非常低的,而且,并發(fā)量比較大的時候,監(jiān)聽 socket 對應的隊列很快就會被占滿(已完成連接隊列有一個最大長度),導致后面的連接無法完成。這是最簡單的模式,雖然服務器的設計中肯定不會使用這種模式,但是它讓我們了解了一個服務器處理請求的整體過程。
多進程模式
串行模式中,所有請求都在一個進程中排隊被處理,效率非常低下。為了提高效率,我們可以把請求分給多個進程處理。因為在串行處理的模式中,如果有文件 IO 操作就會阻塞進程,繼而阻塞后續(xù)請求的處理。在多進程的模式中,即使一個請求阻塞了進程,操作系統(tǒng)還可以調度其它進程繼續(xù)執(zhí)行新的任務。多進程模式分為幾種。
按需 fork
按需 fork 模式是主進程監(jiān)聽端口,有連接到來時,主進程執(zhí)行 accept 摘取連接,然后通過 fork 創(chuàng)建子進程處理連接,邏輯如下。
while(1) {
int client_fd = accept(socket);
// 忽略出錯處理
if (fork() > 0) {
continue;
// 父進程負責 accept
} else {
// 子進程負責處理連接
handle(client_fd);
exit();
}
}這種模式下,每次來一個請求,就會新建一個進程去處理,比串行模式稍微好了一點,每個請求都被獨立處理。假設 a 請求阻塞在文件 IO,不會影響 b 請求的處理,盡可能地做到了并發(fā)。它的缺點是
1. 進程數(shù)有限,如果有大量的請求,需要排隊處理。
2. 進程的開銷會很大,對于系統(tǒng)來說是一個負擔。
3. 創(chuàng)建進程需要時間,實時創(chuàng)建會增加處理請求的時間。
pre-fork 模式 + 主進程 accept
pre-fork 模式就是服務器啟動的時候,預先創(chuàng)建一定數(shù)量的進程,但是這些進程是 worker 進程,不負責接收連接,只負責處理請求。處理過程為主進程負責接收連接,然后把接收到的連接交給 worker 進程處理,流程如下。
邏輯如下:
let fds = [[], [], [], …進程個數(shù)];
let process = [];
for (let i = 0 ; i < 進程個數(shù); i++) {
// 創(chuàng)建管道用于傳遞文件描述符
socketpair(fds[i]);
let pid;
if (pid = fork() > 0) {
// 父進程
process.push({pid, 其它字段});
} else {
const index = i;
// 子進程處理請求
while(1) {
// 從管道中讀取文件描述符
var client_fd = read(fd[index][1]);
// 處理請求
handle(client_fd);
}
}
}
// 主進程 accept
for (;;) {
const clientFd = accept(socket);
// 找出處理該請求的子進程
const i = findProcess();
// 傳遞文件描述符
write(fds[i][0], clientFd);
}和 fork 模式相比,pre-fork 模式相對比較復雜,因為在前一種模式中,主進程收到一個請求就會實時 fork 一個子進程,這個子進程會繼承主進程中新請求對應的 fd,可以直接處理該 fd 對應的請求。但是在進程池的模式中,子進程是預先創(chuàng)建的,當主進程收到一個請求的時候,子進程中無法拿得到該請求對應的 fd 。這時候就需要主進程使用傳遞文件描述符的技術把這個請求對應的 fd 傳給子進程。
pre-fork 模式 + 子進程 accept
剛才介紹的模式中,是主進程接收連接,然后傳遞給子進程處理,這樣主進程就會成為系統(tǒng)的瓶頸,它可能來不及接收和分發(fā)請求給子進程,而子進程卻很空閑。子進程 accept 這種模式也是會預先創(chuàng)建多個進程,區(qū)別是多個子進程會調用 accept 共同處理請求,而不需要父進程參與,邏輯如下。
int server_fd = socket();
bind(server_fd);
for (let i = 0 ; i < 進程個數(shù); i++) {
if (fork() > 0) {
// 父進程負責監(jiān)控子進程
} else {
// 子進程處理請求
listen(server_fd);
while(1) {
int client_fd = accept(socket);
handle(client_fd);
}
}
}這種模式下多個子進程都阻塞在 accept,如果這時候有一個請求到來,那么所有的子進程都會被喚醒,但是先被調度的子進程會摘下這個請求節(jié)點,后續(xù)的進程被喚醒后可能會遇到已經沒有請求可以處理,而又進入睡眠,這種進程被無效喚醒的現(xiàn)象就是著名的驚群現(xiàn)象。這種模式的處理流程如下。
Nginx 中解決了驚群這個問題,它的處理方式是在 accpet 之前先加鎖,拿到鎖的進程才進行 accept,這樣就保證了只有一個進程會阻塞在 accept,不會引起驚群問題,但是新版操作系統(tǒng)已經在內核層面解決了這個問題,每次只會喚醒一個進程。
多線程模式
除了使用多進程外,也可以使用多線程技術處理連接,多線程模式和多進程模式類似,區(qū)別是在進程模式中,每個子進程都有自己的 task_struct,這就意味著在 fork 之后,每個進程負責維護自己的數(shù)據、資源。線程則不一樣,線程共享進程的數(shù)據和資源,所以連接可以在多個線程中共享,不需要通過文件描述符傳遞的方式進行處理,比如如下架構。
上圖中,主線程負責 accept 請求,然后通過互斥的方式插入一個任務到共享隊列中,線程池中的子線程同樣是通過互斥的方式,從共享隊列中摘取節(jié)點進行處理。
事件驅動
從之前的處理模式中我們知道,為了應對大量的請求,服務器通常需要大量的進程 / 線程,這是個非常大的開銷?,F(xiàn)在很多服務器(Nginx、Nodejs、Redis)都開始使用單進程 + 事件驅動模式去設計,這種模式可以在單個進程中輕松處理成千上萬的請求。
但也正因為單進程模式下,再多的請求也只在一個進程里處理,這樣一個任務會一直在占據 CPU,后續(xù)的任務就無法執(zhí)行了。因此,事件驅動模式不適合 CPU 密集型的場景,更適合 IO 密集的場景(一般都會提供線程 / 線程池,負責處理 CPU 或者阻塞型的任務)。大部分操作系統(tǒng)都提供了事件驅動的 API,但是事件驅動在不同系統(tǒng)中實現(xiàn)不一樣,所以一般都會有一層抽象層抹平這個差異。這里以 Linux 的 epoll 為例子。
// 創(chuàng)建一個 epoll 實例
int epoll_fd = epoll_create();
/*
在 epoll 給某個文件描述符注冊感興趣的事件,這里是監(jiān)聽的 socket,注冊可讀事件,即連接到來
event = {
event: 可讀
fd:監(jiān)聽 socket
// 一些上下文
}
*/
epoll_ctl(epoll_fd , EPOLL_CTL_ADD , socket, event);
while(1) {
// 阻塞等待事件就緒,events 保存就緒事件的信息,total 是個數(shù)
int total= epoll_wait(epoll_fd , 保存就緒事件的結構events, 事件個數(shù), timeout);
for (let i = 0; i < total; i++) {
if (events[fd] === 監(jiān)聽 socket) {
int client_fd = accpet(socket);
// 把新的 socket 也注冊到 epoll,等待可讀,即可讀取客戶端數(shù)據
epoll_ctl(epoll_fd , EPOLL_CTL_ADD , client_fd, event);
} else {
// 從events[i] 中拿到一些上下文,執(zhí)行相應的回調
}
}
}事件驅動模式的處理流程為服務器注冊文件描述符和事件到 epoll 中,然后 epoll 開始阻塞,當有事件觸發(fā)時 epoll 就會返回哪些 fd 的哪些事件觸發(fā)了,接著服務器遍歷就緒事件并執(zhí)行對應的回調,在回調里可以再次注冊 / 刪除事件,就這樣不斷驅動著進程的運行。epoll 的原理其實也類似事件驅動,它底層維護用戶注冊的事件和文件描述符,本身也會在文件描述符對應的文件 / socket / 管道處注冊一個回調,等被通知有事件發(fā)生的時候,就會把 fd 和事件返回給用戶,大致原理如下。
function epoll_wait() {
for 事件個數(shù)
// 調用文件系統(tǒng)的函數(shù)判斷
if (事件 [i] 中對應的文件描述符中有某個用戶感興趣的事件發(fā)生 ?) {
插入就緒事件隊列
} else {
/*
在事件 [i] 中的文件描述符所對應的文件 / socke / 管道等資源中注冊回調。
感興趣的事件觸發(fā)后回調 epoll,回調 epoll 后,epoll 把該 event[i] 插入
就緒事件隊列返回給用戶
*/
}
}SO_REUSEPORT 端口復用
新版 Linux 支持 SO_REUSEPORT 特性后,使得服務器性能有了很大的提升。SO_REUSEPORT 之前,一個 socket 是無法綁定到同一個地址的,通常的做法是主進程接收連接然后傳遞給子進程處理,或者主進程 bind 后 fork 子進程,然后子進程執(zhí)行 listen,但底層是共享同一個 socket,所以連接到來時所有子進程都會被喚醒,但是只有一個連接可以處理這個請求,其他的進程被無效喚醒。SO_REUSEPORT 特性支持多個子進程對應多個監(jiān)聽 socket,多個 socket 綁定到同一個地址,當連接到來時,操作系統(tǒng)會根據地址信息找到一組 socket,然后根據策略選擇一個 socket 并喚醒阻塞在該 socket 的進程,被 socket 喚醒的進程處理自己的監(jiān)聽 socket 下的連接就行,架構如下。
除了前面介紹的模式外,還有基于協(xié)程的模式,服務器技術繁多,就不一一介紹了。
IO 模型
IO 模型是服務器中非常重要的部分,操作系統(tǒng)通常會提供了多種 IO 模型,常見的如下。
阻塞 IO
當線程執(zhí)行一個 IO 操作時,如果不滿足條件,當前線程會被阻塞,然后操作系統(tǒng)會調度其他線程執(zhí)行。
非阻塞 IO
非阻塞 IO 在不滿足條件的情況下直接返回一個錯誤碼給線程,而不是阻塞線程。
那么這個阻塞是什么意思呢?直接看一段操作系統(tǒng)的代碼。
// 沒有空間可寫了
while(!(space = UN_BUF_SPACE(pupd)))
{
// 非阻塞模式,直接返回錯誤碼
if (nonblock)
return(-EAGAIN);
// 阻塞模式,進入阻塞狀態(tài)
interruptible_sleep_on(sock->wait);
}
void interruptible_sleep_on(struct wait_queue **p)
{
// 修改線程狀態(tài)為阻塞狀態(tài)
__sleep_on(p,TASK_INTERRUPTIBLE);
}
static inline void __sleep_on(struct wait_queue **p, int state)
{
unsigned long flags;
// current 代表當前執(zhí)行的線程
struct wait_queue wait = { current, NULL };
// 修改線程狀態(tài)為阻塞狀態(tài)
current->state = state;
// 當前線程加入到資源的阻塞隊列,資源就緒后喚醒線程
add_wait_queue(p, &wait);
// 重新調度其他線程執(zhí)行,即從就緒的線程中選擇一個來執(zhí)行
schedule();
}通過這段代碼,我們就可以非常明確地了解到阻塞和非阻塞到底是指什么。
多路復用 IO
在阻塞式 IO 中,我們需要通過阻塞進程來感知 IO 是否就緒,在非阻塞式 IO 中,我們需要通過輪詢來感知 IO 是否就緒,這些都不是合適的方式。為了更好感知 IO 是否就緒,操作系統(tǒng)實現(xiàn)了訂閱發(fā)布機制,我們只需要注冊感興趣的 fd 和事件,當事件發(fā)生時我們就可以感知到。多路復用 IO 可以同時訂閱多個 fd 的多個事件,是現(xiàn)在高性能服務器的基石??匆粋€例子。
#include
#include
#include
int main(int argc, char **argv)
{
// 用于注冊事件到 kqueue
struct kevent event;
// 用于接收從 kqueue 返回到事件,表示哪個 fd 觸發(fā)了哪些事件
struct kevent emit_event;
int kqueue_fd, file_fd, result;
// 打開需要監(jiān)控的文件,拿到一個 fd
file_fd = open(argv[1], O_RDONLY);
if (file_fd == -1) {
printf("Fail to open %s", argv[1]);
return 1;
}
// 創(chuàng)建 kqueue 實例
kqueue_fd = kqueue();
// 設置需要監(jiān)聽的事件,文件被寫入時觸發(fā)
EV_SET(&event,file_fd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_RENAME, 0, NULL);
// 注冊到操作系統(tǒng)
result = kevent(kqueue_fd, &event, 1, NULL, 0, NULL);
// 不斷阻塞等待,直到文件被寫入
while(1) {
// result 返回觸發(fā)事件的 fd 個數(shù),這里是一個
result = kevent(kqueue_fd, NULL, 0, &emit_event, 1, NULL);
if (result > 0) {
printf("%s have been renamed\n", argv[1]);
}
}
} 異步 IO
前面介紹的幾種 IO 模型中,當 IO 就緒時需要自己執(zhí)行讀寫操作,而異步 IO 是 IO 就緒時,操作系統(tǒng)幫助線程完成 IO 操作,然后再通知線程操作完成了。下面以 io_uring(Linux 中的異步 IO 框架) 為例了解下具體的情況。
uv_loop_t* loop;
napi_get_uv_event_loop(env, &loop);
struct io_uring_info *io_uring_data = (io_uring_info *)loop->data;
// 申請內存
struct request *req = (struct request *)malloc(sizeof(*req) + (sizeof(struct iovec) * 1));
req->fd = fd;
req->offset = offset;
// 保存回調
napi_create_reference(env, args[2], 1, &req->func);
req->env = env;
req->nvecs = 1;
// 記錄buffer大小
req->iovecs[0].iov_len = bufferLength;
// 記錄內存地址
req->iovecs[0].iov_base = bufferData;
// 提交給操作系統(tǒng),操作系統(tǒng)讀完后通知線程,op 為 IORING_OP_READV 表示讀操作
submit_request(op, req, &io_uring_data->ring);上面的代碼就是我們提交了一個讀請求給操作系統(tǒng),然后操作系統(tǒng)在文件可讀并且讀完成后通知我們。
Libuv 雖然寫著是異步 IO 庫,但是它并不是真正的異步 IO。它的意思是,你提交一個 IO 請求時,可以注冊一個回調,然后就可以去做其他事情了,等操作完成后它會通知你,它的底層實現(xiàn)是線程池 + 多路復用 IO。
Node.js TCP 服務器的實現(xiàn)
Node.js 服務器的底層是 IO 多路復用 + 非阻塞 IO,所以可以輕松處理成千上萬的請求,但是因為 Node.js 是單線程的,所以更適合處理 IO 密集型的任務。下面看看 Node.js 中服務器是如何實現(xiàn)的。
啟動服務器
在 Node.js 中,我們通常使用以下方式創(chuàng)建一個服務器。
// 創(chuàng)建一個 TCP Server
const server = net.createServer((socket) => {
// 處理連接
});
// 監(jiān)聽端口,啟動服務器
server.listen(8888);使用 net.createServer 可以創(chuàng)建一個服務器,然后拿到一個 Server 對象,接著調用 Server 對象的 listen 函數(shù)就可以啟動一個 TCP 服務器了。下面來看一下具體的實現(xiàn)。
function createServer(options, connectionListener) {
return new Server(options, connectionListener);
}
function Server(options, connectionListener) {
EventEmitter.call(this);
// 服務器收到的連接數(shù),可以通過 maxConnections 限制并發(fā)連接數(shù)
this._connections = 0;
// C++ 層的對象,真正實現(xiàn) TCP 功能的地方
this._handle = null;
// 服務器下的連接是否允許半關閉
this.allowHalfOpen = options.allowHalfOpen || false;
// 有連接時是否注冊可讀事件,如果該 socket 是交給其他進程處理的話可以設置為 true
this.pauseOnConnect = !!options.pauseOnConnect;
}createServer 返回的是一個一般的 JS 對象,繼續(xù)看一下 listen 函數(shù)的邏輯,listen 函數(shù)邏輯很繁瑣,但是原理大致是一樣的,所以只講解常用的情況。
Server.prototype.listen = function(...args) {
/*
處理入參,listen 可以接收很多種格式的參數(shù),
假設我們這里只傳了 8888 端口號
*/
const normalized = normalizeArgs(args);
// normalized = [{port: 8888}, null];
const options = normalized[0];
// 監(jiān)聽成功后的回調
const cb = normalized[1];
// listen 成功后執(zhí)行的回調
if (cb !== null) {
this.once('listening', cb);
}
listenIncluster(this,
null,
options.port | 0,
4,
...);
return this;
};listen 處理了入參后,接著調用了 listenIncluster。
function listenIncluster(server,
address,
port,
addressType,
backlog,
fd,
exclusive) {
exclusive = !!exclusive;
if (cluster === null) cluster = require('cluster');
if (cluster.isMaster || exclusive) {
server._listen2(address, port, addressType, backlog, fd);
return;
}
}這里只分析在主進程創(chuàng)建服務器的情況,listenIncluster 中執(zhí)行了 _listen2,_listen2 對應的函數(shù)是 setupListenHandle。
function setupListenHandle(address, port, addressType, backlog, fd) {
// 通過 C++ 層導出的 API 創(chuàng)建一個對象,該對象關聯(lián)了 C++ 層的 TCPWrap 對象
this._handle = new TCP(TCPConstants.SERVER);
// 創(chuàng)建 socket 并綁定地址到 socket 中
this._handle.bind(address, port);
// 有完成三次握手的連接時執(zhí)行的回調
this._handle.onconnection = onconnection;
// 互相關聯(lián)
this._handle.owner = this;
// 執(zhí)行 C++ 層 listen
this._handle.listen(backlog || 511);
// 觸發(fā) listen 回調
nextTick(this[async_id_symbol], emitListeningNT, this);
}setupListenHandle 的邏輯如下。
1. 調用 new TCP 創(chuàng)建一個 handle(new TCP 對象關聯(lián)了 C++ 層的 TCPWrap 對象)。
2. 保存處理連接的函數(shù) onconnection,當有連接時被執(zhí)行。
3. 調用了 bind 綁定地址到 socket。
4. 調用 listen 函數(shù)修改 socket 狀態(tài)為監(jiān)聽狀態(tài)。首先看看 new TCP 做了什么。
void TCPWrap::New(const FunctionCallbackInfo& args) {
new TCPWrap(env, args.This(), ...);
}
TCPWrap::TCPWrap(Environment* env, Local new TCP 本質上是創(chuàng)建一個 TCP 層的 TCPWrap 對象,并初始化了 Libuv 的數(shù)據結構 uv_tcp_t(TCPWrap 是對 Libuv uv_tcp_t 的封裝)。接著看 bind。
template
void TCPWrap::Bind(...) {
// 通過 JS 對象拿到關聯(lián)的 C++ TCPWrap 對象
TCPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
// 通過 JS 傳入的地址信息直接調用 Libuv
uv_tcp_bind(&wrap->handle_,
reinterpret_cast(&addr),
flags);
} Bind 函數(shù)的邏輯很簡單,直接調用了 Libuv 函數(shù)。
int uv_tcp_bind(...) {
return uv__tcp_bind(handle, addr, addrlen, flags);
}
int uv__tcp_bind(uv_tcp_t* tcp,
const struct sockaddr* addr,
unsigned int addrlen,
unsigned int flags) {
// 創(chuàng)建一個 socket,并把返回的 fd 保存到 tcp 結構體中
maybe_new_socket(tcp, addr->sa_family, 0);
on = 1;
// 默認設置了 SO_REUSEADDR 屬性,后面具體分析
setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
// 綁定地址信息到 socket
bind(tcp->io_watcher.fd, addr, addrlen);
return 0;
}uv__tcp_bind 創(chuàng)建了一個 TCP socket 然后把地址信息保存到該 socket 中,執(zhí)行 bind 綁定了地址信息后就繼續(xù)調用 listen 把 socket 變成監(jiān)聽狀態(tài),C++ 層代碼和 Bind 的差不多,就不再分析,直接看 Libuv 的代碼。
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
}
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
static int single_accept = -1;
unsigned long flags;
int err;
// 已廢棄
if (single_accept == -1) {
const char* val = getenv("UV_TCP_SINGLE_ACCEPT");
single_accept = (val != NULL && atoi(val) != 0);
}
// 有連接時是否連續(xù)接收,或者間歇性處理,見后面分析
if (single_accept)
tcp->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
flags = 0;
// 設置 flags 到 handle 上,因為已經創(chuàng)建了 socket
maybe_new_socket(tcp, AF_INET, flags);
listen(tcp->io_watcher.fd, backlog)
// 保存回調,有連接到來時被 Libuv 執(zhí)行
tcp->connection_cb = cb;
tcp->flags |= UV_HANDLE_BOUND;
// 有連接來時的處理函數(shù),該函數(shù)再執(zhí)行上面的 connection_cb
tcp->io_watcher.cb = uv__server_io;
// 注冊可讀事件,等待連接到來
uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);
return 0;
}uv_tcp_listen 首先調用了 listen 函數(shù)修改 socket 狀態(tài)為監(jiān)聽狀態(tài),這樣才能接收 TCP 連接,接著保存了 C++ 層的回調,并設置 Libuv 層的回調,最后注冊可讀事件等待 TCP 連接的到來。這里需要注意兩個回調函數(shù)的執(zhí)行順序,當有 TCP 連接到來時 Libuv 會執(zhí)行 uvserver_io,在 uvserver_io 里再執(zhí)行 C++ 層的回調 cb。至此,服務器就啟動了。其中 uv__io_start 最終會把服務器對應的文件描述符注冊到 IO多路 復用模塊中。
處理連接
當有三次握手的連接完成時,操作系統(tǒng)會新建一個通信的 socket,并通知 Libuv,Libuv 會執(zhí)行 uv__server_io。
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
int err;
stream = container_of(w, uv_stream_t, io_watcher);
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
// 回調了可能關閉了 server,所以需要實時判斷
while (uv__stream_fd(stream) != -1) {
// 摘取一個 TCP 連接,成功的話,err 保存了對應的 fd
err = uv__accept(uv__stream_fd(stream));
// 保存 fd 在 accepted_fd,等待處理
stream->accepted_fd = err;
// 執(zhí)行回調
stream->connection_cb(stream, 0);
// 如果回調里沒有處理該 accepted_fd,則注銷可讀事件、先不處理新的連接
if (stream->accepted_fd != -1) {
uv__io_stop(loop, &stream->io_watcher, POLLIN);
return;
}
// 設置了 UV_HANDLE_TCP_SINGLE_ACCEPT 則進入睡眠,讓其他進程有機會參與處理
if (stream->type == UV_TCP &&
(stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
struct timespec timeout = { 0, 1 };
nanosleep(&timeout, NULL);
}
}
}uvserver_io 中通過 uvaccept 從操作系統(tǒng)中摘取一個完成連接的 TCP socket 并拿到一個 fd ,接著保存到 accepted_fd 中并執(zhí)行 connection_cb 回調。
此外,我們需要注意 UV_HANDLE_TCP_SINGLE_ACCEPT 標記。因為可能有多個進程監(jiān)聽同一個端口,當多個連接到來時,多個進程可能會競爭處理這些連接(驚群問題)。這樣一來,首先被調度的進程可能會直接處理所有的連接,導致負載不均衡。通過 UV_HANDLE_TCP_SINGLE_ACCEPT 標記,可以在通知進程接收連接時,每接收到一個后先睡眠一段時間,讓其他進程也有機會接收連接,一定程度解決負載不均衡的問題,不過這個邏輯最近被去掉了,Libuv 維護者 bnoordhuis 的理由是,第二次調用 uvaccept 時有 99.9% 的概念會返回 EAGAIN,那就是沒有更多的連接可以處理,這樣額外調用 uvaccept 帶來的系統(tǒng)調用開銷是比較可觀的。
接著看 connection_cb,connection_cb 對應的是 C++ 層的 OnConnection。
// WrapType 為 TCPWrap,UVType 為 uv_tcp_t
template
void ConnectionWrap::OnConnection(uv_stream_t* handle, int status) {
// HandleWrap 中保存了 handle 和 TCPWrap 的關系,這里取出來使用
WrapType* wrap_data = static_cast(handle->data);
Environment* env = wrap_data->env();
Local argv[] = {
Integer::New(env->isolate(), status),
Undefined(env->isolate())
};
// 新建一個表示和客戶端通信的對象,和 JS 層執(zhí)行 new TCP 一樣
Local 當建立了新連接時,操作系統(tǒng)會新建一個 socket。同樣,在 Node.js 層,也會通過 Instantiate 函數(shù)新建一個對應的對象表示和客戶端的通信。結構如下所示。
Instantiate 代碼如下所示。
MaybeLocal新建完和對端通信的對象后,接著調用 uv_accept 消費剛才保存在 accepted_fd 中的 fd,并把對應的 fd 保存到 C++ TCPWrap 對象的 uv_tcp_t 結構體中。
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
int err;
// 把 accepted_fd 保存到 client 中
uv__stream_open(client,
server->accepted_fd,
UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
// 處理了,重置該字段
server->accepted_fd = -1;
// 保證注冊了可讀事件,繼續(xù)處理新的連接
uv__io_start(server->loop, &server->io_watcher, POLLIN);
return err;
}C++ 層拿到一個新的對象并且保存了 fd 到對象后,接著回調 JS 層的 onconnection。
// clientHandle 代表一個和客戶端建立 TCP 連接的實體
function onconnection(err, clientHandle) {
const handle = this;
const self = handle.owner;
// 新建一個 socket 用于通信
const socket = new Socket({
handle: clientHandle,
allowHalfOpen: self.allowHalfOpen,
pauseOnCreate: self.pauseOnConnect
});
// 服務器的連接數(shù)加一
self._connections++;
// 觸發(fā)用戶層連接事件
self.emit('connection', socket);
}在 JS 層也會封裝一個 Socket 對象用于管理和客戶端的通信,整體的關系如下。
接著觸發(fā) connection 事件,剩下的事情就是應用層處理了,整體流程如下。
Node.js HTTP 服務器的創(chuàng)建
接著看看 HTTP 服務器的實現(xiàn)。下面是 Node.js 中創(chuàng)建服務器的例子。
const http = require('http');
http.createServer((req, res) => {
res.write('hello');
res.end();
})
.listen(3000);我們沿著 createServer 開始分析。
function createServer(opts, requestListener) {
return new Server(opts, requestListener);
}createServer 中創(chuàng)建了一個 Server 對象,來看看 Server 初始化的邏輯。
function Server(options, requestListener) {
// 可以自定義表示請求的對象和響應的對象
this[kIncomingMessage] = options.IncomingMessage || IncomingMessage;
this[kServerResponse] = options.ServerResponse || ServerResponse;
// HTTP 頭最大字節(jié)數(shù)
this.maxHeaderSize = options.maxHeaderSize;
// 允許半關閉
net.Server.call(this, { allowHalfOpen: true });
// 有請求時的回調
if (requestListener) {
this.on('request', requestListener);
}
// 服務器 socket 讀端關閉時是否允許繼續(xù)處理隊列里的響應(TCP 上有多個請求,管道化)
this.httpAllowHalfOpen = false;
// 有連接時的回調,由 net 模塊觸發(fā)
this.on('connection', connectionListener);
// 服務器下所有請求和響應的超時時間
this.timeout = 0;
// 同一個 TCP 連接上,兩個請求之前最多間隔的時間
this.keepAliveTimeout = 5000;
// HTTP 頭的最大個數(shù)
this.maxHeadersCount = null;
// 解析頭部的最長時間,防止 ddos
this.headersTimeout = 60 * 1000;
}Server 中主要做了一些字段的初始化,并且監(jiān)聽了 connection 和 request 兩個事件,當有連接到來時會觸發(fā) connection 事件,connection 事件的處理函數(shù)會調用 HTTP 解析器進行數(shù)據的解析,當解析出一個 HTTP 請求時就會觸發(fā) request 事件通知用戶。
創(chuàng)建了 Server 對象后,接著我們調用它的 listen 函數(shù)。因為 HTTP Server 繼承于 net.Server,所以執(zhí)行 HTTP Server 的 listen 函數(shù)時,其實是執(zhí)行了 net.Serve 的 listen 函數(shù),net.Server 的 listen 函數(shù)前面已經分析過,就不再分析。當有請求到來時,會觸發(fā) connection 事件,從而執(zhí)行 connectionListener。
function connectionListener(socket) {
defaultTriggerAsyncIdScope(
getOrSetAsyncId(socket), connectionListenerInternal, this, socket
);
}
// socket 表示新連接
function connectionListenerInternal(server, socket) {
// socket 所屬 server
socket.server = server;
// 分配一個 HTTP 解析器
const parser = parsers.alloc();
// 初始化解析器
parser.initialize(HTTPParser.REQUEST, ...);
// 關聯(lián)起來
parser.socket = socket;
socket.parser = parser;
const state = {
onData: null,
// 同一 TCP 連接上,請求和響應的的隊列,線頭阻塞的原理
outgoing: [],
incoming: [],
};
// 監(jiān)聽 TCP 上的數(shù)據,開始解析 HTTP 報文
state.onData = socketOnData.bind(undefined,
server,
socket,
parser,
state);
socket.on('data', state.onData);
// 解析 HTTP 頭部完成后執(zhí)行的回調
parser.onIncoming = parserOnIncoming.bind(undefined,
server,
socket,
state);
/*
如果 handle 是繼承 StreamBase 的流,則在 C++ 層解析 HTTP 請求報文,
否則使用上面的 socketOnData 函數(shù)處理 HTTP 請求報文,
TCP 模塊的 isStreamBase 為 true
*/
if (socket._handle && socket._handle.isStreamBase &&
!socket._handle._consumed) {
parser._consumed = true;
socket._handle._consumed = true;
parser.consume(socket._handle);
}
// 執(zhí)行 llhttp_execute 時的回調
parser[kOnExecute] = onParserExecute.bind(undefined,
server,
socket,
parser,
state);
}上面的 connectionListenerInternal 函數(shù)中首先分配了一個 HTTP 解析器,HTTP 解析器由以下代碼管理。
const parsers = new FreeList('parsers', 1000, function parsersCb() {
const parser = new HTTPParser();
cleanParser(parser);
parser.onIncoming = null;
// 各種鉤子毀掉
parser[kOnHeaders] = parserOnHeaders;
parser[kOnHeadersComplete] = parserOnHeadersComplete;
parser[kOnBody] = parserOnBody;
parser[kOnMessageComplete] = parserOnMessageComplete;
return parser;
});parsers 用于管理 HTTP 解析器,它負責分配 HTTP 解析器,并且在 HTTP 解析器不再使用時緩存起來給下次使用,而不是每次都創(chuàng)建一個新的解析器。分配完 HTTP 解析器后就開始等待 TCP 上數(shù)據的到來,即 HTTP 請求報文。但是這里有一個邏輯需要注意,上面代碼中 Node.js 監(jiān)聽了 socket 的 data 事件,處理函數(shù)為 socketOnData,下面是 socketOnData 的邏輯。
function socketOnData(server, socket, parser, state, d) {
// 交給 HTTP 解析器處理,返回已經解析的字節(jié)數(shù)
const ret = parser.execute(d);
}socketOnData 調用 HTTP 解析器處理數(shù)據,這看起來沒什么問題,但是有一個邏輯我們可能會忽略掉,看一下下面的代碼。
if (socket._handle && socket._handle.isStreamBase) {
parser.consume(socket._handle);
}上面代碼中,如果 socket._handle.isStreamBase 為 true(TCP handle 的 isStreamBase 為 true),則會執(zhí)行 parser.consume(socket._handle),這個是做什么的呢?
static void Consume(const FunctionCallbackInfo& args) {
Parser* parser;
ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
// 解析出 C++ TCPWrap 對象
StreamBase* stream = StreamBase::FromObject(args[0].As Consume 會注冊 parser 會成為流的消費者,這個邏輯會覆蓋掉剛才的 onData 函數(shù),使得所有的數(shù)據直接由 parser 處理,看一下當數(shù)據到來時,parser 是如何處理的。
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override {
// 解析 HTTP 協(xié)議
Local ret = Execute(buf.base, nread);
// 執(zhí)行 kOnExecute 回調
Local cb = object()->Get(env()->context(), kOnExecute).ToLocalChecked();
MakeCallback(cb.As(), 1, &ret);
} 在 OnStreamRead 中會源源不斷地把數(shù)據交給 HTTP 解析器處理并執(zhí)行 kOnExecute 回調,并且在解析的過程中,會不斷觸發(fā)對應的鉤子函數(shù)。比如解析到 HTTP 頭部時執(zhí)行 parserOnHeaders。
function parserOnHeaders(headers, url) {
// 記錄解析到的 HTTP 頭
if (this.maxHeaderPairs <= 0 ||
this._headers.length < this.maxHeaderPairs) {
this._headers = this._headers.concat(headers);
}
this._url += url;
}parserOnHeaders 會記錄解析到的 HTTP 頭,當解析完 HTTP 頭 時會調用 parserOnHeadersComplete。
function parserOnHeadersComplete(versionMajor, versionMinor, headers, method,
url, statusCode, statusMessage, upgrade,
shouldKeepAlive) {
const parser = this;
const { socket } = parser;
// 創(chuàng)建一個對象表示收到的 HTTP 請求
const ParserIncomingMessage = (socket && socket.server &&
socket.server[kIncomingMessage]) ||
IncomingMessage;
// 新建一個IncomingMessage對象
const incoming = parser.incoming = new ParserIncomingMessage(socket);
// 執(zhí)行回調
return parser.onIncoming(incoming, shouldKeepAlive);
}parserOnHeadersComplete 中創(chuàng)建了一個對象來表示收到的 HTTP 請求,接著執(zhí)行 onIncoming 函數(shù),對應的是 parserOnIncoming。
function parserOnIncoming(server, socket, state, req, keepAlive) {
// 請求入隊(待處理的請求隊列)
state.incoming.push(req);
// 新建一個表示響應的對象
const res = new server[kServerResponse](req);
/*
socket 當前已經在處理其它請求的響應,則先排隊,
否則掛載響應對象到 socket,作為當前處理的響應
*/
if (socket._httpMessage) {
state.outgoing.push(res);
} else {
res.assignSocket(socket);
}
// 響應處理完畢后,需要做一些處理
res.on('finish', resOnFinish.bind(undefined,
req,
res,
socket,
state,
server));
// 觸發(fā) request 事件說明有請求到來
server.emit('request', req, res);
}我們看到這里會觸發(fā) request 事件通知用戶有新請求到來,并傳入request和response作為參數(shù),這樣用戶就可以處理請求了。另外 Node.js 本身是不會處理 HTTP 請求體的數(shù)據,當 Node.js 解析到請求體時會執(zhí)行 kOnBody 鉤子函數(shù),對應的是 parserOnBody 函數(shù)。
function parserOnBody(b, start, len) {
// IncomingMessage 對象,即 request 對象
const stream = this.incoming;
// Pretend this was the result of a stream._read call.
if (len > 0 && !stream._dumped) {
const slice = b.slice(start, start + len);
const ret = stream.push(slice);
if (!ret)
readStop(this.socket);
}
}parserOnBody 會把數(shù)據 push 到請求對象 request 中,接著 Node.js 會觸發(fā) data 事件,所以我們可以通過以下方式獲取 body 的數(shù)據。
const server= http.createServer((request, response) => {
request.on('data', (chunk) => {
// 處理body
});
request.on('end', () => {
// body結束
});
})Node.js 的多進程服務器架構
雖然 Node.js 是單進程單線程的應用,但是我們可以創(chuàng)建多個進程來共同請求。在創(chuàng)建 HTTP 服務器時會調用 net 模塊的 listen,然后調用 listenIncluster。我們從該函數(shù)開始分析。
function listenIncluster(server, address, port, addressType,
backlog, fd, exclusive, flags) {
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
server._handle = handle;
server._listen2(address,
port,
addressType,
backlog,
fd,
flags);
}
}listenIncluster 函數(shù)會調用子進程 cluster 模塊的 _getServer 函數(shù)。
cluster._getServer = function(obj, options, cb) {
let address = options.address;
const message = {
act: 'queryServer',
index,
data: null,
...options
};
message.address = address;
// 給主進程發(fā)送消息
send(message, (reply, handle) => {
// 根據不同模式做處理
if (handle)
shared(reply, handle, indexesKey, cb);
else
rr(reply, indexesKey, cb);
});
};從上面代碼中可以看到,_getServer 函數(shù)會給主進程發(fā)送一個 queryServer 的請求并設置了一個回調函數(shù)??匆幌轮鬟M程是如何處理 queryServer 請求的。
function queryServer(worker, message) {
const key = `${message.address}:${message.port}:${message.addressType}:${message.fd}:${message.index}`;
let handle = handles.get(key);
if (handle === undefined) {
let address = message.address;
let constructor = RoundRobinHandle;
// 根據策略選取不同的構造函數(shù),UDP 只能使用共享模式,因為 UDP 不是基于連接的,沒有連接可以分發(fā)
if (schedulingPolicy !== SCHED_RR ||
message.addressType === 'udp4' ||
message.addressType === 'udp6') {
constructor = SharedHandle;
}
handle = new constructor(key,
address,
message.port,
message.addressType,
message.fd,
message.flags);
handles.set(key, handle);
}
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
// 返回結果給子進程
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});
}queryServer 首先根據調度策略選擇構造函數(shù)并創(chuàng)建一個對象,然后執(zhí)行該對象的 add 方法并且傳入一個回調。下面看看不同策略下的處理。
共享模式
首先看看共享模式的實現(xiàn),共享模式對應前面分析的主進程管理子進程,多個子進程共同 accept 處理連接這種方式。
function SharedHandle(key, address, port, addressType, fd, flags) {
this.key = key;
this.workers = [];
this.handle = null;
this.errno = 0;
let rval;
if (addressType === 'udp4' || addressType === 'udp6')
rval = dgram._createSocketHandle(address,
port,
addressType,
fd,
flags);
else
rval = net._createServerHandle(address,
port,
addressType,
fd,
flags);
if (typeof rval === 'number')
this.errno = rval;
else
this.handle = rval;
}SharedHandle 是共享模式,即主進程創(chuàng)建好 handle,交給子進程處理,接著看它的 add 函數(shù)。
SharedHandle.prototype.add = function(worker, send) {
this.workers.push(worker);
send(this.errno, null, this.handle);
};SharedHandle 的 add 把 SharedHandle 中創(chuàng)建的 handle 返回給子進程。接著看子進程拿到 handle 后的處理。
function shared(message, handle, indexesKey, cb) {
const key = message.key;
const close = handle.close;
handle.close = function() {
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
// 因為是共享的,可以直接 close 掉而不會影響其它子進程等
return close.apply(handle, arguments);
};
handles.set(key, handle);
// 執(zhí)行 net 模塊的回調
cb(message.errno, handle);
}shared 函數(shù)把接收到的 handle 再回傳到調用方,即 net 模塊的 listenOnMasterHandle 函數(shù),listenOnMasterHandle 會執(zhí)行 listen 開始監(jiān)聽地址。
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
// this._handle 即主進程返回的 handle
// 連接到來時的回調
this._handle.onconnection = onconnection;
this._handle[owner_symbol] = this;
const err = this._handle.listen(backlog || 511);
}這樣多個子進程就成功啟動了服務器。共享模式的核心邏輯是主進程在 _createServerHandle 創(chuàng)建 handle 時執(zhí)行 bind 綁定了地址(但沒有 listen),然后通過文件描述符傳遞的方式傳給子進程,子進程執(zhí)行 listen 的時候就不會報端口已經被監(jiān)聽的錯誤了,因為端口被監(jiān)聽的錯誤是執(zhí)行 bind 的時候返回的。邏輯如下圖所示。
看一個共享模式的使用例子。
const cluster = require('cluster');
const os = require('os');
// 設置為共享模式
cluster.schedulingPolicy = cluster.SCHED_NONE;
// 主進程 fork 多個子進程
if (cluster.isMaster) {
// 通常根據 CPU 核數(shù)創(chuàng)建多個進程 os.cpus().length
for (let i = 0; i < 3; i++) {
cluster.fork();
}
} else { // 子進程創(chuàng)建服務器
const net = require('net');
const server = net.createServer((socket) => {
socket.destroy();
console.log(`handled by process: ${process.pid}`);
});
server.listen(8080);
}輪詢模式
接著看輪詢模式,輪詢模式對應前面的主進程 accept,分發(fā)給多個子進程處理這種方式。
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
this.key = key;
this.all = new Map();
this.free = [];
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) {
// 啟動一個服務器
this.server.listen({
port,
host: address,
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address); // UNIX socket path.
// 監(jiān)聽成功后,注冊 onconnection 回調,有連接到來時執(zhí)行
this.server.once('listening', () => {
this.handle = this.server._handle;
// 分發(fā)請求給子進程
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}因為 RoundRobinHandle的 工作模式是主進程負責監(jiān)聽,收到連接后分發(fā)給子進程,所以 RoundRobinHandle 中直接啟動了一個服務器,當收到連接時執(zhí)行 this.distribute 進行分發(fā)。接著看一下RoundRobinHandle 的 add 函數(shù)。
RoundRobinHandle.prototype.add = function(worker, send) {
this.all.set(worker.id, worker);
const done = () => {
// send 的第三個參數(shù)是 null,說明沒有 handle
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker);
};
// 否則等待 listen 成功后執(zhí)行回調
this.server.once('listening', done);
};RoundRobinHandle 會在 listen 成功后執(zhí)行回調。我們回顧一下執(zhí)行 add 函數(shù)時的回調。
handle.add(worker, (errno, reply, handle) => {
const { data } = handles.get(key);
send(worker, {
errno,
key,
ack: message.seq,
data,
...reply
}, handle);
});回調函數(shù)會把 handle 等信息返回給子進程。但是在 RoundRobinHandle 和 SharedHandle 中返回的 handle 是不一樣的,分別是 null 和 net.createServer 實例,因為前者不需要啟動一個服務器,它只需要接收來自父進程傳遞的連接就行。
接著我們回到子進程的上下文,看子進程是如何處理的,剛才我們講過,不同的調度策略,返回的 handle 是不一樣的,我們看輪詢模式下的處理。
function rr(message, indexesKey, cb) {
let key = message.key;
// 不需要 listen,空操作
function listen(backlog) {
return 0;
}
function close() {
// 因為 handle 是共享的,所以無法直接關閉,需要告訴父進程,引用數(shù)減一
if (key === undefined)
return;
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
key = undefined;
}
// 構造假的 handle 給調用方
const handle = { close, listen, ref: noop, unref: noop };
handles.set(key, handle);
// 執(zhí)行 net 模塊的回調
cb(0, handle);
}round-robin 模式下,Node.js 會構造一個假的 handle 返回給 net 模塊,因為調用方會調用 handle 的這些函數(shù)。當有請求到來時,round-bobin 模塊會執(zhí)行 distribute 分發(fā)連接給子進程。
RoundRobinHandle.prototype.distribute = function(err, handle) {
// 首先保存 handle 到隊列
this.handles.push(handle);
// 從空閑隊列獲取一個子進程
const worker = this.free.shift();
// 分發(fā)
if (worker)
this.handoff(worker);
};
RoundRobinHandle.prototype.handoff = function(worker) {
// 拿到一個 handle
const handle = this.handles.shift();
// 沒有 handle,則子進程重新入隊
if (handle === undefined) {
this.free.push(worker);
return;
}
// 通知子進程有新連接
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
// 接收成功
if (reply.accepted)
handle.close();
else
// 結束失敗,則重新分發(fā)
this.distribute(0, handle);
// 繼續(xù)分發(fā)
this.handoff(worker);
});
};可以看到 Node.js 沒用按照嚴格的輪詢,而是哪個進程接收連接快,就繼續(xù)給它分發(fā)。接著看一下子進程是怎么處理該請求的。
function onmessage(message, handle) {
if (message.act === 'newconn')
onconnection(message, handle);
}
function onconnection(message, handle) {
const key = message.key;
const server = handles.get(key);
const accepted = server !== undefined;
// 回復接收成功
send({ ack: message.seq, accepted });
if (accepted)
// 在 net 模塊設置
server.onconnection(0, handle);
}最終執(zhí)行 server.onconnection 進行連接的處理。邏輯如下圖所示。
看一下輪詢模式的使用例子。
const cluster = require('cluster');
const os = require('os');
// 設置為輪詢模式
cluster.schedulingPolicy = cluster.SCHED_RR;
// 主進程 fork 多個子進程
if (cluster.isMaster) {
// 通常根據 CPU 核數(shù)創(chuàng)建多個進程 os.cpus().length
for (let i = 0; i < 3; i++) {
cluster.fork();
}
} else { // 子進程創(chuàng)建服務器
const net = require('net');
const server = net.createServer((socket) => {
socket.destroy();
console.log(`handled by process: ${process.pid}`);
});
server.listen(8080);
}實現(xiàn)一個高性能的服務器是非常復雜的,涉及到很多復雜的知識,但是即使不是服務器開發(fā)者,了解服務器相關的一些知識也是非常有用的。
本文題目:Node.js是如何處理請求的
分享URL:http://www.5511xx.com/article/ccdcdsj.html


咨詢
建站咨詢
