日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網營銷解決方案
一文帶你用80行代碼實現簡易RxJS

RxJS 是一個響應式的庫,它接收從事件源發(fā)出的一個個事件,經過處理管道的層層處理之后,傳入最終的接收者,這個處理管道是由操作符組成的,開發(fā)者只需要選擇和組合操作符就能完成各種異步邏輯,極大簡化了異步編程。除此以外,RxJS 的設計還遵循了函數式、流的理念。

成都創(chuàng)新互聯(lián)服務項目包括如皋網站建設、如皋網站制作、如皋網頁制作以及如皋網絡營銷策劃等。多年來,我們專注于互聯(lián)網行業(yè),利用自身積累的技術優(yōu)勢、行業(yè)經驗、深度合作伙伴關系等,向廣大中小型企業(yè)、政府機構等提供互聯(lián)網行業(yè)的解決方案,如皋網站推廣取得了明顯的社會效益與經濟效益。目前,我們服務的客戶以成都為中心已經輻射到如皋省份的部分城市,未來相信會繼續(xù)擴大服務區(qū)域并繼續(xù)獲得客戶的支持與信任!

直接講概念比較難理解,不如我們實現一個簡易的 RxJS 再來看這些。

RxJS 的使用

RxJS 會對事件源做一層封裝,叫做 Observable,由它發(fā)出一個個事件。

比如這樣:

const source = new Observable((observer) => {
let i = 0;
setInterval(() => {
observer.next(++i);
}, 1000);
});

在回調函數里面設置一個定時器,不斷通過 next 傳入事件。

這些事件會被接受者監(jiān)聽,叫做 Observer。

const subscription = source.subscribe({
next: (v) => console.log(v),
error: (err) => console.error(err),
complete: () => console.log('complete'),
});

observer 可以接收 next 傳過來的事件,傳輸過程中可能有 error,也可以在這里處理 error,還可以處理傳輸完成的事件。

這樣的一個監(jiān)聽或者說訂閱,叫做 Subscription。

可以訂閱當然也可以取消訂閱:

subscription.unsubscribe();

取消訂閱時的回調函數是在 Observable 里返回的:

const source = new Observable((observer) => {
let i = 0;
const timer = setInterval(() => {
observer.next(++i);
}, 1000);
return function unsubscribe() {
clearInterval(timer);
};
});

發(fā)送事件、監(jiān)聽事件只是基礎,處理事件的過程才是 RxJS 的精髓,它設計了管道的概念,可以用操作符 operator 來組裝這個管道:

source.pipe(
map((i) => ++i),
map((i) => i * 10)
).subscribe(() => {
//...
})

事件經過管道之后才會傳到 Observer,在傳輸過程中會經過一個個操作符的處理。

比如這里的處理邏輯是,對傳過來的數據加 1,然后再乘以 10。

綜上,使用 RxJS 的代碼就是這樣的:

const source = new Observable((observer) => {
let i = 0;
const timer = setInterval(() => {
observer.next(++i);
}, 1000);
return function unsubscribe() {
clearInterval(timer);
};
});
const subscription = source.pipe(
map((i) => ++i),
map((i) => i * 10)
).subscribe({
next: (v) => console.log(v),
error: (err) => console.error(err),
complete: () => console.log('complete'),
});

setTimeout(() => {
subscription.unsubscribe();
}, 4500);

我們通過 Observable 創(chuàng)建了一個事件源,每秒發(fā)出一個事件,這些事件會經過管道的處理再傳遞給 Observer,管道的組成是兩個 map 操作符,對數據做了 + 1 和 * 10 的處理。

Observer 接收到傳遞過來的數據,做了打印,還對錯誤和結束時的事件做了處理。此外,Observable 提供了取消訂閱時的處理邏輯,當我們在 4.5s 取消訂閱時,就可以清除定時器。

使用 RxJS 基本就是這個流程,那它是怎么實現的呢?

80 行代碼實現

RxJS先從事件源開始,實現 Observable:

觀察下它的特點:

  1. 它接收一個回調函數,里面可以調用 next 來傳輸數據。
  2. 它有 subscribe 方法可以用來添加 Observer 的訂閱,返回 subscription
  3. 它可以在回調函數里返回 unsbscribe 時的處理邏輯
  4. 它有 pipe 方法可以傳入操作符

我們按照這些特點來實現下:

首先,Observable 的構造函數要接收回調函數 _subscribe,但是不是立刻調用,而是在 subscribe 的時候才調用:

class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe() {
this._subscribe();
}
}

回調函數的參數是有 next、error、complete 方法的對象,用于傳遞事件:

class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const subscriber = new Subscriber(observer);
this._subscribe(subscriber);
}
}

class Subscriber{
constructor(observer) {
super();
this.observer = observer;
this.isStopped = false;
}
next(value) {
if (this.observer.next && !this.isStopped) {
this.observer.next(value);
}
}
error(value) {
this.isStopped = true;
if (this.observer.error) {
this.observer.error(value);
}
}
complete() {
this.isStopped = true;
if (this.observer.complete) {
this.observer.complete();
}
if (this.unsubscribe) {
this.unsubscribe();
}
}
}

這樣,在回調函數里面就可以調用 next、error、complete 方法了:

此外,回調函數的返回值是 unsbscribe 時的處理邏輯,要收集起來,在取消訂閱時調用:

class Subscription {
constructor() {
this._teardowns = [];
}
unsubscribe() {
this._teardowns.forEach((teardown) => {
typeof teardown === 'function' ? teardown() : teardown.unsubscribe()
});
}
add(teardown) {
if (teardown) {
this._teardowns.push(teardown);
}
}
}

提供 unsubscribe 方法用于取消訂閱,_teardowns 用于收集所有的取消訂閱時的回調,在 unsubscribe 時調用所有 teardown 回調。

這段邏輯比較通用,可以作為 Subscriber 的父類。

然后,在 Observable 里調用 add 來添加 teardown,并且返回 subscription(它有 unsubscribe 方法):

class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const subscriber = new Subscriber(observer);
subscriber.add(this._subscribe(subscriber));
return subscriber;
}
}
class Subscriber extends Subscription {
constructor(observer) {
super();
this.observer = observer;
this.isStopped = false;
}
next(value) {
if (this.observer.next && !this.isStopped) {
this.observer.next(value);
}
}
error(value) {
this.isStopped = true;
if (this.observer.error) {
this.observer.error(value);
}
}
complete() {
this.isStopped = true;
if (this.observer.complete) {
this.observer.complete();
}
if (this.unsubscribe) {
this.unsubscribe();
}
}
}

class Subscription {
constructor() {
this._teardowns = [];
}
unsubscribe() {
this._teardowns.forEach((teardown) => {
typeof teardown === 'function' ? teardown() : teardown.unsubscribe()
});
}
add(teardown) {
if (teardown) {
this._teardowns.push(teardown);
}
}
}

這樣,我們就實現了 Observable 和 Observer,只寫了 50 行代碼。先來測試下:

const source = new Observable((observer) => {
let i = 0;
const timer = setInterval(() => {
observer.next(++i);
}, 1000);
return function unsubscribe() {
clearInterval(timer);
};
});
const subscription = source.subscribe({
next: (v) => console.log(v),
error: (err) => console.error(err),
complete: () => console.log('complete'),
});

setTimeout(() => {
subscription.unsubscribe();
}, 4500);

Observer 監(jiān)聽到了 Observable 傳遞過來的 1、2、3、4 的數據,因為在 4.5s 時取消了訂閱,所以后面就不再有數據了。

我們用 50 行實現了基礎的 RxJS!

當然,最精髓的 operator 還沒有實現,接下來繼續(xù)完善。

我們給 Observable 添加 pipe 方法,它會調用傳入的 operator,并且上個的結果是下個的輸入,這樣就串起來了,也就是管道的概念:

class Observable {
constructor(_subscribe) {
//...
}
subscribe(observer) {
//...
}
pipe(...operations) {
return pipeFromArray(operations)(this);
}
}
function pipeFromArray(fns) {
if (fns.length === 0) {
return (x) => x;
}
if (fns.length === 1) {
return fns[0];
}
return (input) => {
return fns.reduce((prev, fn) => fn(prev), input);
};
}

當傳入的參數是 0 個的時候,就直接返回之前的 Observable,1 個的時候直接返回,否則就通過 reduce 的方式串聯(lián)起來,組成管道。

operator 的實現就是監(jiān)聽上一個 Observable,返回一個新的。

比如 map 的實現,就是傳入 project 對 value 做處理,把結果用 next 傳下去:

function map(project) {
return (observable) => new Observable((subscriber) => {
const subcription = observable.subscribe({
next(value) {
return subscriber.next(project(value));
},
error(err) {
subscriber.error(err);
},
complete() {
subscriber.complete();
},
});
return subcription;
});
}

這樣我們就實現了 operator,來測試下:

我們調用了 pipe 方法,使用兩個 map 操作符來組織處理流程,對數據做了 +1 和 *10 的處理。

所以,Observable 傳遞過來的 1、2、3、4 傳遞給 Observer 的時候就變成了 20、30、40、50。

至此,我們實現了 RxJS 的 Observable、Observer、Subscription、operator 等概念,是一個簡易版 RxJS 了。只用了 80 行代碼。

再來看最開始的那些理念:

為什么叫做響應式呢?

因為是對事件源做監(jiān)聽和一系列處理的,這種編程模式就叫做響應式。

為什么叫函數式呢?

因為每一步 operator 都是純函數,返回一個新的 Observable,這符合函數式的不可變,修改后返回一個新的的理念。

為什么叫流呢?

因為一個個事件是動態(tài)產生和傳遞的,這種數據的動態(tài)產生和傳遞就可以叫做流。

完整代碼如下:

function pipeFromArray(fns) {
if (fns.length === 0) {
return (x) => x;
}
if (fns.length === 1) {
return fns[0];
}
return (input) => {
return fns.reduce((prev, fn) => fn(prev), input);
};
}
class Subscription {
constructor() {
this._teardowns = [];
}
unsubscribe() {
this._teardowns.forEach((teardown) => {
typeof teardown === 'function' ? teardown() : teardown.unsubscribe()
});
}
add(teardown) {
if (teardown) {
this._teardowns.push(teardown);
}
}
}
class Subscriber extends Subscription {
constructor(observer) {
super();
this.observer = observer;
this.isStopped = false;
}
next(value) {
if (this.observer.next && !this.isStopped) {
this.observer.next(value);
}
}
error(value) {
this.isStopped = true;
if (this.observer.error) {
this.observer.error(value);
}
}
complete() {
this.isStopped = true;
if (this.observer.complete) {
this.observer.complete();
}
if (this.unsubscribe) {
this.unsubscribe();
}
}
}
class Observable {
constructor(_subscribe) {
this._subscribe = _subscribe;
}
subscribe(observer) {
const subscriber = new Subscriber(observer);
subscriber.add(this._subscribe(subscriber));
return subscriber;
}
pipe(...operations) {
return pipeFromArray(operations)(this);
}
}
function map(project) {
return (observable) => new Observable((subscriber) => {
const subcription = observable.subscribe({
next(value) {
return subscriber.next(project(value));
},
error(err) {
subscriber.error(err);
},
complete() {
subscriber.complete();
},
});
return subcription;
});
}
const source = new Observable((observer) => {
let i = 0;
const timer = setInterval(() => {
observer.next(++i);
}, 1000);
return function unsubscribe() {
clearInterval(timer);
};
});
const subscription = source.pipe(
map((i) => ++i),
map((i) => i * 10)
).subscribe({
next: (v) => console.log(v),
error: (err) => console.error(err),
complete: () => console.log('complete'),
});
setTimeout(() =>
網站題目:一文帶你用80行代碼實現簡易RxJS
鏈接URL:http://www.5511xx.com/article/dhejojp.html