日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
徹底掌握Node.js四大流,解決爆緩沖區(qū)的“背壓”問題

把一個東西從 A 搬到 B 該怎么搬呢?

創(chuàng)新互聯(lián)公司專業(yè)為企業(yè)提供咸寧網(wǎng)站建設、咸寧做網(wǎng)站、咸寧網(wǎng)站設計、咸寧網(wǎng)站制作等企業(yè)網(wǎng)站建設、網(wǎng)頁設計與制作、咸寧企業(yè)網(wǎng)站模板建站服務,十多年咸寧做網(wǎng)站經(jīng)驗,不只是建網(wǎng)站,更提供有價值的思路和整體網(wǎng)絡服務。

抬起來,移動到目的地,放下不就行了么。

那如果這個東西有一噸重呢?

那就一部分一部分的搬。

其實 IO 也就是搬東西,包括網(wǎng)絡的 IO、文件的 IO,如果數(shù)據(jù)量少,那么直接傳送全部內(nèi)容就行了,但如果內(nèi)容特別多,一次性加載到內(nèi)存會崩潰,而且速度也慢,這時候就可以一部分一部分的處理,這就是流的思想。

各種語言基本都實現(xiàn)了 stream 的 api,Node.js 也是,stream api 是比較常用的,下面我們就來探究一下 stream。

本文會回答以下問題:

  • Node.js 的 4 種 stream 是什么
  • 生成器如何與 Readable Stream 結(jié)合
  • stream 的暫停和流動
  • 什么是背壓問題,如何解決

Node.js 的 4種 stream

流的直觀感受

從一個地方流到另一個地方,顯然有流出的一方和流入的一方,流出的一方就是可讀流(readable),而流入的一方就是可寫流(writable)。

當然,也有的流既可以流入又可以流出,這種叫做雙工流(duplex)

既然可以流入又可以流出,那么是不是可以對流入的內(nèi)容做下轉(zhuǎn)換再流出呢,這種流叫做轉(zhuǎn)換流(transform)

duplex 流的流入和流出內(nèi)容不需要相關(guān),而 transform 流的流入和流出是相關(guān)的,這是兩者的區(qū)別。

流的 api

Node.js 提供的 stream 就是上面介紹的那 4 種:

 
 
 
 
  1. const stream = require('stream');
  2. // 可讀流
  3. const Readable = stream.Readable;
  4. // 可寫流
  5. const Writable = stream.Writable;
  6. // 雙工流
  7. const Duplex = stream.Duplex;
  8. // 轉(zhuǎn)換流
  9. const Transform = stream.Transform;

它們都有要實現(xiàn)的方法:

  • Readable 需要實現(xiàn) _read 方法來返回內(nèi)容
  • Writable 需要實現(xiàn) _write 方法來接受內(nèi)容
  • Duplex 需要實現(xiàn) _read 和 _write 方法來接受和返回內(nèi)容
  • Transform 需要實現(xiàn) _transform 方法來把接受的內(nèi)容轉(zhuǎn)換之后返回

我們分別來看一下:

Readable

Readable 要實現(xiàn) _read 方法,通過 push 返回具體的數(shù)據(jù)。

 
 
 
 
  1. const Stream = require('stream');
  2. const readableStream = Stream.Readable();
  3. readableStream._read = function() {
  4.     this.push('阿門阿前一棵葡萄樹,');
  5.     this.push('阿東阿東綠的剛發(fā)芽,');
  6.     this.push('阿東背著那重重的的殼呀,');
  7.     this.push('一步一步地往上爬。')
  8.     this.push(null);
  9. }
  10. readableStream.on('data', (data)=> {
  11.     console.log(data.toString())
  12. });
  13. readableStream.on('end', () => {
  14.     console.log('done~');
  15. });

當 push 一個 null 時,就代表結(jié)束流。

執(zhí)行效果如下:

創(chuàng)建 Readable 也可以通過繼承的方式:

 
 
 
 
  1. const Stream = require('stream');
  2. class ReadableDong extends Stream.Readable {
  3.     constructor() {
  4.         super();
  5.     }
  6.     _read() {
  7.         this.push('阿門阿前一棵葡萄樹,');
  8.         this.push('阿東阿東綠的剛發(fā)芽,');
  9.         this.push('阿東背著那重重的的殼呀,');
  10.         this.push('一步一步地往上爬。')
  11.         this.push(null);
  12.     }
  13. }
  14. const readableStream = new ReadableDong();
  15. readableStream.on('data', (data)=> {
  16.     console.log(data.toString())
  17. });
  18. readableStream.on('end', () => {
  19.     console.log('done~');
  20. });

可讀流是生成內(nèi)容的,那么很自然可以和生成器結(jié)合:

 
 
 
 
  1. const Stream = require('stream');
  2. class ReadableDong extends Stream.Readable {
  3.     constructor(iterator) {
  4.         super();
  5.         this.iterator = iterator;
  6.     }
  7.     _read() {
  8.         const next = this.iterator.next();
  9.         if(next.done) {
  10.             return this.push(null);
  11.         } else {
  12.             this.push(next.value)
  13.         }
  14.     }
  15. }
  16. function *songGenerator() {
  17.     yield '阿門阿前一棵葡萄樹,';
  18.     yield '阿東阿東綠的剛發(fā)芽,';
  19.     yield '阿東背著那重重的的殼呀,';
  20.     yield '一步一步地往上爬。';
  21. }
  22. const songIterator = songGenerator();
  23. const readableStream = new ReadableDong(songIterator);
  24. readableStream.on('data', (data)=> {
  25.     console.log(data.toString())
  26. });
  27. readableStream.on('end', () => {
  28.     console.log('done~');
  29. });

這就是可讀流,通過實現(xiàn) _read 方法來返回內(nèi)容。

Writable

Writable 要實現(xiàn) _write 方法,接收寫入的內(nèi)容。

 
 
 
 
  1. const Stream = require('stream');
  2. const writableStream = Stream.Writable();
  3. writableStream._write = function (data, enc, next) {
  4.    console.log(data.toString());
  5.    // 每秒寫一次
  6.    setTimeout(() => {
  7.        next();
  8.    }, 1000);
  9. }
  10. writableStream.on('finish', () => console.log('done~'));
  11. writableStream.write('阿門阿前一棵葡萄樹,');
  12. writableStream.write('阿東阿東綠的剛發(fā)芽,');
  13. writableStream.write('阿東背著那重重的的殼呀,');
  14. writableStream.write('一步一步地往上爬。');
  15. writableStream.end();

接收寫入的內(nèi)容,打印出來,并且調(diào)用 next 來處理下一個寫入的內(nèi)容,這里調(diào)用 next 是異步的,可以控制頻率。

跑了一下,確實可以正常的處理寫入的內(nèi)容:

這就是可寫流,通過實現(xiàn) _write 方法來處理寫入的內(nèi)容。

Duplex

Duplex 是可讀可寫,同時實現(xiàn) _read 和 _write 就可以了

 
 
 
 
  1. const Stream = require('stream');
  2. var duplexStream = Stream.Duplex();
  3. duplexStream._read = function () {
  4.     this.push('阿門阿前一棵葡萄樹,');
  5.     this.push('阿東阿東綠的剛發(fā)芽,');
  6.     this.push('阿東背著那重重的的殼呀,');
  7.     this.push('一步一步地往上爬。')
  8.     this.push(null);
  9. }
  10. duplexStream._write = function (data, enc, next) {
  11.     console.log(data.toString());
  12.     next();
  13. }
  14. duplexStream.on('data', data => console.log(data.toString()));
  15. duplexStream.on('end', data => console.log('read done~'));
  16. duplexStream.write('阿門阿前一棵葡萄樹,');
  17. duplexStream.write('阿東阿東綠的剛發(fā)芽,');
  18. duplexStream.write('阿東背著那重重的的殼呀,');
  19. duplexStream.write('一步一步地往上爬。');
  20. duplexStream.end();
  21. duplexStream.on('finish', data => console.log('write done~'));

整合了 Readable 流和 Writable 流的功能,這就是雙工流 Duplex。

Transform

Duplex 流雖然可讀可寫,但是兩者之間沒啥關(guān)聯(lián),而有的時候需要對流入的內(nèi)容做轉(zhuǎn)換之后流出,這時候就需要轉(zhuǎn)換流 Transform。

Transform 流要實現(xiàn) _transform 的 api,我們實現(xiàn)下對內(nèi)容做反轉(zhuǎn)的轉(zhuǎn)換流:

 
 
 
 
  1. const Stream = require('stream');
  2. class TransformReverse extends Stream.Transform {
  3.   constructor() {
  4.     super()
  5.   }
  6.   _transform(buf, enc, next) {
  7.     const res = buf.toString().split('').reverse().join('');
  8.     this.push(res)
  9.     next()
  10.   }
  11. }
  12. var transformStream = new TransformReverse();
  13. transformStream.on('data', data => console.log(data.toString()))
  14. transformStream.on('end', data => console.log('read done~'));
  15. transformStream.write('阿門阿前一棵葡萄樹');
  16. transformStream.write('阿東阿東綠的剛發(fā)芽');
  17. transformStream.write('阿東背著那重重的的殼呀');
  18. transformStream.write('一步一步地往上爬');
  19. transformStream.end()
  20. transformStream.on('finish', data => console.log('write done~'));

跑了一下,效果如下:

流的暫停和流動

我們從 Readable 流中獲取內(nèi)容,然后流入 Writable 流,兩邊分別做 _read 和 _write 的實現(xiàn),就實現(xiàn)了流動。

背壓

但是 read 和 write 都是異步的,如果兩者速率不一致呢?

如果 Readable 讀入數(shù)據(jù)的速率大于 Writable 寫入速度的速率,這樣就會積累一些數(shù)據(jù)在緩沖區(qū),如果緩沖的數(shù)據(jù)過多,就會爆掉,會丟失數(shù)據(jù)。

而如果 Readable 讀入數(shù)據(jù)的速率小于 Writable 寫入速度的速率呢?那沒關(guān)系,最多就是中間有段空閑時期。

這種讀入速率大于寫入速率的現(xiàn)象叫做“背壓”,或者“負壓”。也很好理解,寫入段壓力比較大,寫不進去了,會爆緩沖區(qū),導致數(shù)據(jù)丟失。

這個緩沖區(qū)大小可以通過 readableHighWaterMark 和 writableHightWaterMark 來查看,是 16k。

解決背壓

怎么解決這種讀寫速率不一致的問題呢?

當沒寫完的時候,暫停讀就行了。這樣就不會讀入的數(shù)據(jù)越來越多,駐留在緩沖區(qū)。

readable stream 有個 readableFlowing 的屬性,代表是否自動讀入數(shù)據(jù),默認為 true,也就是自動讀入數(shù)據(jù),然后監(jiān)聽 data 事件就可以拿到了。

當 readableFlowing 設置為 false 就不會自動讀了,需要手動通過 read 來讀入。

 
 
 
 
  1. readableStream.readableFlowing = false;
  2. let data;
  3. while((data = readableStream.read()) != null) {
  4.     console.log(data.toString());
  5. }

但自己手動 read 比較麻煩,我們依然可以用自動流入的方式,調(diào)用 pause 和 resume 來暫停和恢復就行了。

當調(diào)用 writable stream 的 write 方法的時候會返回一個 boolean 值代表是寫入了目標還是放在了緩沖區(qū):

  • true: 數(shù)據(jù)已經(jīng)寫入目標
  • false:目標不可寫入,暫時放在緩沖區(qū)

我們可以判斷返回 false 的時候就 pause,然后等緩沖區(qū)清空了就 resume:

 
 
 
 
  1. const rs = fs.createReadStream(src);
  2. const ws = fs.createWriteStream(dst);
  3. rs.on('data', function (chunk) {
  4.     if (ws.write(chunk) === false) {
  5.         rs.pause();
  6.     }
  7. });
  8. rs.on('end', function () {
  9.     ws.end();
  10. });
  11. ws.on('drain', function () {
  12.     rs.resume();
  13. });

這樣就能達到根據(jù)寫入速率暫停和恢復讀入速率的功能,解決了背壓問題。

pipe 有背壓問題么?

平時我們經(jīng)常會用 pipe 來直接把 Readable 流對接到 Writable 流,但是好像也沒遇到過背壓問題,其實是 pipe 內(nèi)部已經(jīng)做了讀入速率的動態(tài)調(diào)節(jié)了。

 
 
 
 
  1. const rs = fs.createReadStream(src);
  2. const ws = fs.createWriteStream(dst);
  3. rs.pipe(ws);

總結(jié)

流是傳輸數(shù)據(jù)時常見的思想,就是一部分一部分的傳輸內(nèi)容,是文件讀寫、網(wǎng)絡通信的基礎概念。

Node.js 也提供了 stream 的 api,包括 Readable 可讀流、Writable 可寫流、Duplex 雙工流、Transform 轉(zhuǎn)換流。它們分別實現(xiàn) _read、_write、_read + _write、_transform 方法,來做數(shù)據(jù)的返回和處理。

創(chuàng)建 Readable 對象既可以直接調(diào)用 Readable api 創(chuàng)建,然后重寫 _read 方法,也可以繼承 Readable 實現(xiàn)一個子類,之后實例化。其他流同理。(Readable 可以很容易的和 generator 結(jié)合)

當讀入的速率大于寫入速率的時候就會出現(xiàn)“背壓”現(xiàn)象,會爆緩沖區(qū)導致數(shù)據(jù)丟失,解決的方式是根據(jù) write 的速率來動態(tài) pause 和 resume 可讀流的速率。pipe 就沒有這個問題,因為內(nèi)部做了處理。

流是掌握 IO 繞不過去的一個概念,而背壓問題也是流很常見的問題,遇到了數(shù)據(jù)丟失可以考慮是否發(fā)生了背壓。希望這篇文章能夠幫大家理清思路,真正掌握 stream!


網(wǎng)站標題:徹底掌握Node.js四大流,解決爆緩沖區(qū)的“背壓”問題
文章分享:http://www.5511xx.com/article/coegscs.html