日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
下一代消息隊(duì)列Pulsar到底是什么?

本文轉(zhuǎn)載自微信公眾號「咖啡拿鐵」,作者咖啡拿鐵 。轉(zhuǎn)載本文請聯(lián)系咖啡拿鐵公眾號。

創(chuàng)新互聯(lián)主要從事做網(wǎng)站、網(wǎng)站建設(shè)、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)茂南,十載網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18982081108

背景

之前琢磨了很久一直想寫一篇pulsar相關(guān)的文章,但是一直知識儲備不夠,對于很多細(xì)節(jié)還是不了解,于是查了很多資料,總算是可以湊出一篇文章了。

Pulsar是一個(gè)由yahoo公司于2016年開源的消息中間件,2018年成為Apache的頂級項(xiàng)目。在我之前的文章中寫過很多其他消息中間件的文章,比如kafka,rocketmq等等。

在開源的業(yè)界已經(jīng)有這么多消息隊(duì)列中間件了,pulsar作為一個(gè)新勢力到底有什么優(yōu)點(diǎn)呢?pulsar自從出身就不斷的再和其他的消息隊(duì)列(kafka,rocketmq等等)做比較,但是Pulsar的設(shè)計(jì)思想和大多數(shù)的消息隊(duì)列中間件都不同,具備了高吞吐,低延遲,計(jì)算存儲分離,多租戶,異地復(fù)制等功能,所以pulsar也被譽(yù)為下一代消息隊(duì)列中間件,接下來我會一一對其進(jìn)行詳細(xì)的解析。

pulsar架構(gòu)原理

整體的架構(gòu)和其他的消息隊(duì)列中間件差別不是太大,相信大家也看到了很多熟悉的名詞,接下來會給大家一一解釋這些名詞的含義。

名詞解釋

  • Producer:消息生產(chǎn)者,將消息發(fā)送到broker。
  • Consumer:消息消費(fèi)者,從Broker讀取消息到客戶端,進(jìn)行消費(fèi)處理。
  • Broker: 可以看作是pulsar的server,Producer和Consumer都看作是client.消息處理的節(jié)點(diǎn),pulsar的Broker和其他消息中間件的都不一樣,他是無狀態(tài)的沒有存儲,所以可以無限制的擴(kuò)展,這個(gè)后面也會詳解講到。
  • Bookie: 負(fù)責(zé)所有消息的持久化,這里采用的是Apache Bookeeper。
  • ZK: 和kafka一樣pulsar也是使用zk保存一些元數(shù)據(jù),比如配置管理,topic分配,租戶等等。
  • Service Discovery:可以理解為Pulsar中的nginx,只用一個(gè)url就可以和整個(gè)broker進(jìn)行打交道,當(dāng)然也可以使用自己的服務(wù)發(fā)現(xiàn)??蛻舳税l(fā)出的讀取,更新或刪除主題的初始請求將發(fā)送給可能不是處理該主題的 broker 。如果這個(gè) broker 不能處理該主題的請求,broker 將會把該請求重定向到可以處理主題請求的 broker。

不論是kafka,rocketmq還是我們的pulsar其實(shí)作為消息隊(duì)列中間件最為重要的大概就是分為三個(gè)部分:

  • Producer是如何生產(chǎn)消息,發(fā)送到對應(yīng)的Broker
  • Broker是如何處理消息,將高效的持久化以及查詢
  • Consumer是如何進(jìn)行消費(fèi)消息

而我們后面也會圍繞著這三個(gè)部分進(jìn)行展開講解。

Producer生產(chǎn)消息

先簡單看一下如何用代碼進(jìn)行消息發(fā)送:

 
 
 
 
  1. PulsarClient client = PulsarClient.create("pulsar://pulsar.us-west.example.com:6650"); 
  2.  
  3. Producer producer = client.createProducer( 
  4.                 "persistent://sample/standalone/ns1/my-topic"); 
  5.  
  6. // Publish 10 messages to the topic 
  7. for (int i = 0; i < 10; i++) { 
  8.     producer.send("my-message".getBytes()); 
  • Step1: 首先使用我們的url創(chuàng)建一個(gè)client這個(gè)url是我們service discovery的地址,如果我們使用單機(jī)模式可以進(jìn)行直連
  • Step2:我們傳入了一個(gè)類似url的參數(shù),我們只需要傳遞這個(gè)就能指定我們到底在哪個(gè)topic或者namespace下面創(chuàng)建的:
組成 含義
persistent/non-persistentPulsar 提供持久化、非持久化兩種主題,如果選擇的是非持久化主題的話,所有消息都在內(nèi)存中保存,如果broker重啟,消息將會全部丟失。如果選擇的是持久化主題,所有消息都會持久化到磁盤,重啟broker,消息也可以正常消費(fèi)。
tenant顧名思義就是租戶,pulsar最開始在雅虎內(nèi)部是作為全公司使用的中間件使用的,需要給topic指定一些層級,租戶就是其中一層,比如這個(gè)可以是一個(gè)大的部門,例如電商中臺租戶。
namespace命名空間,可以看作是第二層的層級,比如電商中臺下的訂單業(yè)務(wù)組
topic消息隊(duì)列名字
 
  • Step3: 調(diào)用send方法發(fā)送消息,這里也提供了sendAsync方法支持異步發(fā)送。

上面三個(gè)步驟中,步驟1,2屬于我們準(zhǔn)備階段,用于構(gòu)建客戶端,構(gòu)建Producer,我們真的核心邏輯在send中,那這里我先提幾個(gè)小問題,大家可以先想想在其他消息隊(duì)列中是怎么做的,然后再對比pulsar的看一下:

  • 我們調(diào)用了send之后是會立即發(fā)送嗎?
  • 如果是多partition,怎么找到我應(yīng)該發(fā)送到哪個(gè)Broker呢?

發(fā)送模式

我們上面說了send分為async和sync兩種模式,但實(shí)際上在pulsar內(nèi)部sync模式也是采用的async模式,在sync模式下模擬回調(diào)阻塞,達(dá)到同步的效果,這個(gè)在kafka中也是采用的這個(gè)模式,但是在rocketmq中,所有的send都是真正的同步,都會直接請求到broker。

基于這個(gè)模式,在pulsar和kafka中都支持批量發(fā)送,在rocketmq中是直接發(fā)送,批量發(fā)送有什么好處呢?當(dāng)我們發(fā)送的TPS特別高的時(shí)候,如果每次發(fā)送都直接和broker直連,可能會做很多的重復(fù)工作,比如壓縮,鑒權(quán),創(chuàng)建鏈接等等。比如我們發(fā)送1000條消息,那么可能會做1000次這個(gè)重復(fù)的工作,如果是批量發(fā)送的話這1000條消息合并成一次請求,相對來說壓縮,鑒權(quán)這些工作就只需要做一次。

有同學(xué)可能會問,批量發(fā)送會不會導(dǎo)致發(fā)送的時(shí)間會有一定的延誤?這個(gè)其實(shí)不需要擔(dān)心,在pulsar中默認(rèn)定時(shí)每隔1ms發(fā)送一次batch,或者當(dāng)batchsize默認(rèn)到了1000都會進(jìn)行發(fā)送,這個(gè)發(fā)送的頻率都還是很快的。

發(fā)送負(fù)載均衡

在消息隊(duì)列中通常會將topic進(jìn)行水平擴(kuò)展,在pulsar和kafka中叫做partition,在rocketmq中叫做queue,本質(zhì)上都是分區(qū),我們可以將不同分區(qū)落在不同的broker上,達(dá)到我們水平擴(kuò)展的效果。

在我們發(fā)送的時(shí)候可以自己制定選擇partition的策略,也可以使用它默認(rèn)輪訓(xùn)partition策略。當(dāng)我們選擇了partition之后,我們怎么確定哪一個(gè)partition對應(yīng)哪一個(gè)broker呢?

可以先看看下面這個(gè)圖:

  • Step1: 我們所有的信息分區(qū)映射信息在zk和broker的緩存中都有進(jìn)行存儲。
  • Step2: 我們通過查詢broker,可以獲取到分區(qū)和broker的關(guān)系,并且定時(shí)更新。
  • Step3: 在pulsar中每個(gè)分區(qū)在發(fā)送端的時(shí)候都被抽象成為一個(gè)單獨(dú)的Producer,這個(gè)和kafka,rocketmq都不一樣,在kafka里面大概就是選擇了partition之后然后再去找partition對應(yīng)的broker地址,然后進(jìn)行發(fā)送。pulsar將每一個(gè)partition都封裝成Producer,在代碼實(shí)現(xiàn)上就不需要去關(guān)注他具體對應(yīng)的是哪個(gè)broker,所有的邏輯都在producer這個(gè)代碼里面,整體來說比較干凈。

壓縮消息

消息壓縮是優(yōu)化信息傳輸?shù)氖侄沃?,我們通??匆娨恍┐笮臀募紩且砸粋€(gè)壓縮包的形式提供下載,在我們消息隊(duì)列中我們也可以用這種思想,我們將一個(gè)batch的消息,比如有1000條可能有1M的傳輸大小,但是經(jīng)過壓縮之后可能就只會有幾十kb,增加了我們和broker的傳輸效率,但是與之同時(shí)我們的cpu也帶來了損耗。Pulsar客戶端支持多種壓縮類型,如 lz4、zlib、zstd、snappy 等。

 
 
 
 
  1. client.newProducer() 
  2.     .topic(“test-topic”) 
  3.     .compressionType(CompressionType.LZ4) 
  4.     .create(); 

Broker

接下來我們來說說第二個(gè)比較重要的部分Broker,在Broker的設(shè)計(jì)中pulsar和其他所有的消息隊(duì)列差別比較大,而正是因?yàn)檫@個(gè)差別也成為了他的特點(diǎn)。

計(jì)算和存儲分離

首先我們來說說他最大的特點(diǎn):計(jì)算和存儲分離。我們在開始的說過Pulsar是下一代消息隊(duì)列,就非常得益于他這個(gè)架構(gòu)設(shè)計(jì),無論是kafka還是RocketMQ,所有的計(jì)算和存儲都放在同一個(gè)機(jī)器上,這個(gè)模式有幾個(gè)弊端:

  • 擴(kuò)展困難:當(dāng)我們需要擴(kuò)展的集群的時(shí)候,我們通常是因?yàn)閏pu或者磁盤其中一個(gè)原因影響,但是我們卻要申請一個(gè)可能cpu和磁盤配置都很好的機(jī)器,造成了資源浪費(fèi)。并且kafka這種進(jìn)行擴(kuò)展,還需要進(jìn)行遷移數(shù)據(jù),過程十分繁雜。
  • 負(fù)載不均衡:當(dāng)某些partion數(shù)據(jù)特別多的時(shí)候,會導(dǎo)致broker負(fù)載不均衡,如下面圖,如果某個(gè)partition數(shù)據(jù)特別多,那么就會導(dǎo)致某個(gè)broker(輪船)承載過多的數(shù)據(jù),但是另外的broker可能又比較空閑

pulsar計(jì)算分離架構(gòu)能夠非常好的解決這個(gè)問題:

  • 對于計(jì)算:也就是我們的broker,提供消息隊(duì)列的讀寫,不存儲任何數(shù)據(jù),無狀態(tài)對于我們擴(kuò)展非常友好,只要你機(jī)器足夠,就能隨便上。擴(kuò)容Broker往往適用于增加Consumer的吞吐,當(dāng)我們有一些大流量的業(yè)務(wù)或者活動,比如電商大促,可以提前進(jìn)行broker的擴(kuò)容。
  • 對于存儲:也就是我們的bookie,只提供消息隊(duì)列的存儲,如果對消息量有要求的,我們可以擴(kuò)容bookie,并且我們不需要遷移數(shù)據(jù),擴(kuò)容十分方便。

消息存儲

名詞解析:

上圖是bookie的讀寫架構(gòu)圖,里面有一些名詞需要先介紹一下:

  • Entry,Entry是存儲到bookkeeper中的一條記錄,其中包含Entry ID,記錄實(shí)體等。
  • Ledger,可以認(rèn)為ledger是用來存儲Entry的,多個(gè)Entry序列組成一個(gè)ledger。
  • Journal,其實(shí)就是bookkeeper的WAL(write ahead log),用于存bookkeeper的事務(wù)日志,journal文件有一個(gè)最大大小,達(dá)到這個(gè)大小后會新起一個(gè)journal文件。
  • Entry log,存儲Entry的文件,ledger是一個(gè)邏輯上的概念,entry會先按ledger聚合,然后寫入entry log文件中。同樣,entry log會有一個(gè)最大值,達(dá)到最大值后會新起一個(gè)新的entry log文件
  • Index file,ledger的索引文件,ledger中的entry被寫入到了entry log文件中,索引文件用于entry log文件中每一個(gè)ledger做索引,記錄每個(gè)ledger在entry log中的存儲位置以及數(shù)據(jù)在entry log文件中的長度。
  • MetaData Storage,元數(shù)據(jù)存儲,是用于存儲bookie相關(guān)的元數(shù)據(jù),比如bookie上有哪些ledger,bookkeeper目前使用的是zk存儲,所以在部署bookkeeper前,要先有zk集群。

整體架構(gòu)上的寫流程:

  • Step1: broker發(fā)起寫請求,首先對Journal磁盤寫入WAL,熟悉mysql的朋友知道redolog,journal和redolog作用一樣都是用于恢復(fù)沒有持久化的數(shù)據(jù)。
  • Step2: 然后再將數(shù)據(jù)寫入index和ledger,這里為了保持性能不會直接寫盤,而是寫pagecache,然后異步刷盤。
  • Step3: 對寫入進(jìn)行ack。

讀流程為:

  • Step1: 先讀取index,當(dāng)然也是先讀取cache,再走disk。
  • Step2: 獲取到index之后,根據(jù)index去entry logger中去對應(yīng)的數(shù)據(jù)

如何高效讀寫?

在kafka中當(dāng)我們的topic變多了之后,由于kafka一個(gè)topic一個(gè)文件,就會導(dǎo)致我們的磁盤IO從順序?qū)懽兂呻S機(jī)寫。在rocketMq中雖然將多個(gè)topic對應(yīng)一個(gè)寫入文件,讓寫入變成了順序?qū)?,但是我們的讀取很容易導(dǎo)致我們的pagecache被各種覆蓋刷新,這對于我們的IO的影響是非常大的。所以pulsar在讀寫兩個(gè)方面針對這些問題都做了很多優(yōu)化:

  • 寫流程:順序?qū)?+ pagecache。在寫流程中我們的所有的文件都是獨(dú)立磁盤,并且同步刷盤的只有Journal,Journal是順序?qū)懸粋€(gè)journal-wal文件,順序?qū)懶史浅8?。ledger和index雖然都會存在多個(gè)文件,但是我們只會寫入pagecache,異步刷盤,所以隨機(jī)寫不會影響我們的性能。
  • 讀流程:broker cache + bookie cache,在pulsar中對于追尾讀(tailing read)非常友好基本不會走io,一般情況下我們的consumer是會立即去拿producer發(fā)送的消息的,所以這部分在持久化之后依然在broker中作為cache存在,當(dāng)然就算broker沒有cache(比如broker是新建的),我們的bookie也會在memtable中有自己的cache,通過多重cache減少讀流程走io。

我們可以發(fā)現(xiàn)在最理想的情況下讀寫的io是完全隔離開來的,所以在Pulsar中能很容易就支持百萬級topic,而在我們的kafka和rocketmq中這個(gè)是非常困難的。

無限流式存儲

一個(gè)Topic實(shí)際上是一個(gè)ledgers流(Segment),通過這個(gè)設(shè)計(jì)所以Pulsar他并不是一個(gè)單純的消息隊(duì)列系統(tǒng),他也可以代替流式系統(tǒng),所以他也叫流原生平臺,可以替代flink等系統(tǒng)。

可以看見我們的Event Stream(topic/partition),由多個(gè)Segment存儲組成,而每個(gè)segment由entry組成,這個(gè)可以看作是我們每批發(fā)送的消息通常會看作是一個(gè)entry。

Segment可以看作是我們寫入文件的一個(gè)基本維度,同一個(gè)Segment的數(shù)據(jù)會寫在同一個(gè)文件上面,不同Segment將會是不同文件,而Segment之間的在metadata中進(jìn)行保存。

分層存儲

在kafka和rocketmq中消息是會有一定的保存時(shí)間的,因?yàn)榇疟P會有空間限制,在pulsar中也提供這個(gè)功能,但是如果你想讓自己的消息永久存儲,那么可以使用分級存儲,我們可以將一些比較老的數(shù)據(jù),定時(shí)的刷新到廉價(jià)的存儲中,比如s3,那么我們就可以無限存儲我們的消息隊(duì)列了。

數(shù)據(jù)復(fù)制

在pulsar中的數(shù)據(jù)復(fù)制和kafka,rocketmq都有很大的不同,在其他消息隊(duì)列中通常是其他副本主動同步,通常這個(gè)時(shí)間就會變得不可預(yù)測,而在pulsar采用了類似qurom協(xié)議,給一組可用的bookie池,然后并發(fā)的寫入其中的一部分bookie,只要返回部分成功(通常大于1/2)就好。

  • Ensemble Size(E)決定給定 ledger 可用的 bookie 池大小。
  • Write Quorum Size(Qw)指定 Pulsar 向其中寫入 entry 的 bookie 數(shù)量。
  • Ack Quorum Size(Qa)指定必須 ack 寫入的 bookie 數(shù)量。

采用這種并發(fā)寫的方式,會更加高效的進(jìn)行數(shù)據(jù)復(fù)制,尤其是當(dāng)數(shù)據(jù)副本比較多的時(shí)候。

Consumer

接下來我們來聊聊pulsar中最后一個(gè)比較重要的組成consumer。

訂閱模式

訂閱模式是用來定義我們的消息如何分配給不同的消費(fèi)者,不同消息隊(duì)列中間件都有自己的訂閱模式,一般我們常見的訂閱模式有:

  • 集群模式:一條消息只能被一個(gè)集群內(nèi)的消費(fèi)者所消費(fèi)。
  • 廣播模式:一條消息能被集群內(nèi)所有的消費(fèi)者消費(fèi)。

在pulsar中提供了4種訂閱模式,分別是獨(dú)占,災(zāi)備,共享,鍵共享:

  • 獨(dú)占:顧名思義只能由一個(gè)消費(fèi)者獨(dú)占,如果同一個(gè)集群內(nèi)有第二個(gè)消費(fèi)者去注冊,第二個(gè)就會失敗,這個(gè)適用于全局有序的消息。
  • 災(zāi)備:加強(qiáng)版獨(dú)占,如果獨(dú)占的那個(gè)掛了,會自動的切換到另外一個(gè)好的消費(fèi)者,但是還是只能由一個(gè)獨(dú)占。
  • 共享模式:這個(gè)模式看起來有點(diǎn)像集群模式,一條消息也是只能被一個(gè)集群內(nèi)消費(fèi)者消費(fèi),但是和rocketmq不同的是,rocketmq是以partition維度,同一個(gè)Partition的數(shù)據(jù)都會被發(fā)到一個(gè)機(jī)器上。在Pulsar中消費(fèi)不會以partition維度,而是輪訓(xùn)所有消費(fèi)者進(jìn)行消息發(fā)送。這有個(gè)什么好處呢?如果你有100臺機(jī)器,但是你只有10個(gè)partition其實(shí)你只有10臺消費(fèi)者能運(yùn)轉(zhuǎn),但是在pulsar中100臺機(jī)器都可以進(jìn)行消費(fèi)處理。
  • 鍵共享:類似上面說的partition維度去發(fā)送,在rocketmq中同一個(gè)key的順序消息都會被發(fā)送到一個(gè)partition,但是這里不會有partition維度,而只是按照key的hash去分配到固定的consumer,也解決了消費(fèi)者能力限制于partition個(gè)數(shù)問題。

消息獲取模式

不論是在kafka還是在rocketmq中我們都是client定時(shí)輪訓(xùn)我們的broker獲取消息,這種模式叫做長輪訓(xùn)(Long-Polling)模式。這種模式有一個(gè)缺點(diǎn)網(wǎng)絡(luò)開銷比較大,我們來計(jì)算一下consumer被消費(fèi)的時(shí)延,我們假設(shè)broker和consumer之間的一次網(wǎng)絡(luò)延時(shí)為R,那么我們總共的時(shí)間為:

  • 當(dāng)某一條消息A剛到broker的,這個(gè)時(shí)候long-polling剛好打包完數(shù)據(jù)返回,broker返回到consumer這個(gè)時(shí)間為R。
  • consumer又再次發(fā)送request請求,這個(gè)又為R。
  • 將我們的消息A返回給consumer這里又為R。

如果只考慮網(wǎng)絡(luò)時(shí)延,我們可以看見我們這條消息的消費(fèi)時(shí)延大概是3R,所以我們必須想點(diǎn)什么對其進(jìn)行一些優(yōu)化,有同學(xué)可能馬上就能想到,我們消息來了直接推送給我們的consumer不就對了,這下我們的時(shí)延只會有一次R,這個(gè)就是我們常見的推模式,但是簡單的推模式是有問題的,如果我們有生產(chǎn)速度遠(yuǎn)遠(yuǎn)大于消費(fèi)速度,那么推送的消息肯定會干爆我們的內(nèi)存,這個(gè)就是背壓。那么我們怎么解決背壓呢?我們就可以優(yōu)化推送方式,將其變?yōu)閯討B(tài)推送,我們結(jié)合Long-polling,在long-polling請求時(shí)將Buffer剩余空間告知給Broker,由Broker負(fù)責(zé)推送數(shù)據(jù)。此時(shí)Broker知道最多可以推送多少條數(shù)據(jù),那么就可以控制推送行為,不至于沖垮Consumer。

舉個(gè)例子:

Consumer發(fā)起請求時(shí)Buffer剩余容量為100,Broker每次最多返回32條消息,那么Consumer的這次long-polling請求Broker將在執(zhí)行3次push(共push96條消息)之后返回response給Consumer(response包含4條消息)。

如果采用long-polling模型,Consumer每發(fā)送一次請求Broker執(zhí)行一次響應(yīng),這個(gè)例子需要進(jìn)行4次long-polling交互(共4個(gè)request和4個(gè)response,8次網(wǎng)絡(luò)操作;Dynamic Push/Pull中是1個(gè)request,三次push和一個(gè)response,共5次網(wǎng)絡(luò)操作)。

所以pulsar就采用了這種消息獲取模式,從consumer層進(jìn)一步優(yōu)化消息達(dá)到時(shí)間。我覺得這個(gè)設(shè)計(jì)非常巧妙,很多中間件的這種long-polling模式都可以參考這種思想去做一個(gè)改善。

總結(jié)

Apache Pulsar很多設(shè)計(jì)思想都和其他中間件不一樣,但無疑于其更加貼近于未來,大膽預(yù)測一下其他的一些消息中間件未來的發(fā)展也都會向其靠攏,目前國內(nèi)的Pulsar使用者也是越來越多,騰訊云提供了pulsar的云版本TDMQ,當(dāng)然還有一些其他的知名公司華為,知乎,虎牙等等有都在對其做一個(gè)逐步的嘗試,我相信pulsar真的是一個(gè)趨勢。最后也讓我想起了最近大江大河大結(jié)局的一句話:

所有的變化,都可能伴隨著痛苦和彎路,開放的道路,也不會是闊野坦途,但大江大河,奔涌向前的趨勢,不是任何險(xiǎn)灘暗礁,能夠阻擋的。道之所在,雖千萬人吾往矣。


網(wǎng)站名稱:下一代消息隊(duì)列Pulsar到底是什么?
新聞來源:http://www.5511xx.com/article/dhcdoip.html