日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
自適應(yīng)批作業(yè)調(diào)度器:為Flink批作業(yè)自動(dòng)推導(dǎo)并行度

?01引言

對(duì)大部分用戶來說,為 Flink 算子配置合適的并行度并不是一件容易的事。對(duì)于批作業(yè),小的并行度會(huì)導(dǎo)致作業(yè)運(yùn)行時(shí)間長(zhǎng),故障恢復(fù)慢,而不必要的大并行度會(huì)導(dǎo)致資源浪費(fèi),任務(wù)部署和數(shù)據(jù) shuffle 開銷也會(huì)變大。

成都創(chuàng)新互聯(lián)公司長(zhǎng)期為近1000家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺(tái),與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為蛟河企業(yè)提供專業(yè)的網(wǎng)站制作、做網(wǎng)站,蛟河網(wǎng)站改版等技術(shù)服務(wù)。擁有10多年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。

為了控制批作業(yè)的執(zhí)行時(shí)長(zhǎng),算子的并行度應(yīng)該和其需要處理的數(shù)據(jù)量成正比。用戶需要通過預(yù)估算子需要處理的數(shù)據(jù)量來配置并行度。但準(zhǔn)確預(yù)估算子需要處理的數(shù)據(jù)量是一件很困難的事情:需要處理的數(shù)據(jù)量可能每天都在變化,作業(yè)中可能會(huì)存在大量的 UDF 和復(fù)雜算子導(dǎo)致難以判斷其產(chǎn)出的數(shù)據(jù)量。

為了解決這個(gè)問題,我們?cè)?Flink 1.15 中引入了一種新的調(diào)度器:自適應(yīng)批作業(yè)調(diào)度器(Adaptive Batch Scheduler)。自適應(yīng)批作業(yè)調(diào)度器會(huì)在作業(yè)運(yùn)行時(shí)根據(jù)每個(gè)算子需要處理的實(shí)際數(shù)據(jù)量來自動(dòng)推導(dǎo)并行度。它會(huì)帶來以下好處:

  1. 大大降低批處理作業(yè)并發(fā)度調(diào)優(yōu)的繁瑣程度;
  2. 可以根據(jù)處理的數(shù)據(jù)量為不同的算子配置不同的并行度,這對(duì)于之前只能配置全局并行度的 SQL 作業(yè)尤其有益;
  3. 可以更好的適應(yīng)每日變化的數(shù)據(jù)量。

02用法

使 Flink 自動(dòng)推導(dǎo)算子的并行度,需要進(jìn)行以下配置:

  1. 啟用自適應(yīng)批作業(yè)調(diào)度器;
  2. 配置算子的并行度為 -1。

2.1 啟用自適應(yīng)批作業(yè)調(diào)度器

啟用自適應(yīng)批作業(yè)調(diào)度器,需要進(jìn)行以下配置:

  1. 配置 jobmanager.scheduler: AdaptiveBatch;
  2. 將 execution.batch-shuffle-mode 配置為 ALL-EXCHANGES-BLOCKING (默認(rèn)值)。因?yàn)槟壳白赃m應(yīng)批作業(yè)調(diào)度器只支持 shuffle mode 為 ALL-EXCHANGES-BLOCKING 的作業(yè)。

此外,還有一些相關(guān)配置來指定自動(dòng)推導(dǎo)的算子并行度的上下限、預(yù)期每個(gè)算子處理的數(shù)據(jù)量以及 source 算子的默認(rèn)并行度,詳情請(qǐng)參閱 Flink 文檔 [1]。

2.2 配置算子的并行度為 -1

自適應(yīng)批作業(yè)調(diào)度器只會(huì)為用戶未指定并行度的算子(即并行度為默認(rèn)值 -1)推導(dǎo)并行度。所以需要進(jìn)行以下配置:

  1. 配置 parallelism.default: -1;
  2. 對(duì)于 SQL 作業(yè),需要配置 table.exec.resource.default-parallelism: -1;
  3. 對(duì)于 DataStream/DataSet 作業(yè),避免在作業(yè)中通過算子的 setParallelism() 方法來指定并行度;
  4. 對(duì)于 DataStream/DataSet 作業(yè),避免在作業(yè)中通過 StreamExecutionEnvironment/ExecutionEnvironment 的 setParallelism() 方法來指定并行度。

03實(shí)現(xiàn)細(xì)節(jié)

接下來我們將介紹自適應(yīng)批作業(yè)調(diào)度器的實(shí)現(xiàn)細(xì)節(jié)。在此之前,我們簡(jiǎn)要介紹一下涉及到的一些術(shù)語概念:

  1. 邏輯節(jié)點(diǎn)(JobVertex)[2] 和邏輯拓?fù)洌↗obGraph)[3]:邏輯節(jié)點(diǎn)是為了更優(yōu)的性能而將幾個(gè)算子鏈接到一起形成的算子鏈,邏輯拓?fù)鋭t是多個(gè)邏輯節(jié)點(diǎn)連接組成的數(shù)據(jù)流圖。
  2. 執(zhí)行節(jié)點(diǎn)(ExecutionVertex)[4] 和執(zhí)行拓?fù)洌‥xecutionGraph)[5]:執(zhí)行節(jié)點(diǎn)對(duì)應(yīng)一個(gè)可部署物理任務(wù),是邏輯節(jié)點(diǎn)根據(jù)并行度進(jìn)行展開生成的。例如,如果一個(gè)邏輯節(jié)點(diǎn)的并行度為 100,就會(huì)生成 100 個(gè)對(duì)應(yīng)的執(zhí)行節(jié)點(diǎn)。執(zhí)行拓?fù)鋭t是所有執(zhí)行節(jié)點(diǎn)連接組成的物理執(zhí)行圖。

以上概念的介紹可以參見 Flink 文檔 [6]。需要注意的是,自適應(yīng)批作業(yè)調(diào)度器是通過推導(dǎo)邏輯節(jié)點(diǎn)的并行度來決定該節(jié)點(diǎn)包含的算子的并行度的。

實(shí)現(xiàn)細(xì)節(jié)主要包括以下幾部分:

  1. 使調(diào)度器能夠收集執(zhí)行節(jié)點(diǎn)產(chǎn)出數(shù)據(jù)的大小;
  2. 引入一個(gè)新組件 VertexParallelismDecider [7] 來負(fù)責(zé)根據(jù)邏輯節(jié)點(diǎn)需要處理的數(shù)據(jù)量計(jì)算其并行度;
  3. 支持動(dòng)態(tài)構(gòu)建執(zhí)行拓?fù)?,即?zhí)行拓?fù)鋸囊粋€(gè)空的執(zhí)行拓?fù)溟_始,然后隨著作業(yè)調(diào)度逐漸添加執(zhí)行節(jié)點(diǎn);
  4. 引入自適應(yīng)批作業(yè)調(diào)度器來更新和調(diào)度執(zhí)行拓?fù)洹?/li>

后續(xù)章節(jié)會(huì)對(duì)以上內(nèi)容進(jìn)行詳細(xì)介紹。

圖 1 - 自動(dòng)推導(dǎo)并行度的整體結(jié)構(gòu)

3.1 收集執(zhí)行節(jié)點(diǎn)產(chǎn)出的數(shù)據(jù)量

自適應(yīng)批作業(yè)調(diào)度器是根據(jù)邏輯節(jié)點(diǎn)需要處理的數(shù)據(jù)量來決定其并行度的,因此需要收集上游節(jié)點(diǎn)產(chǎn)出的數(shù)據(jù)量。為此,我們引入了一個(gè) numBytesProduced 計(jì)數(shù)器來記錄每個(gè)執(zhí)行節(jié)點(diǎn)產(chǎn)出的數(shù)據(jù)分區(qū)(ResultPartition)的數(shù)據(jù)量,并在執(zhí)行節(jié)點(diǎn)運(yùn)行完成時(shí)將累計(jì)值發(fā)送給調(diào)度器。

3.2 為邏輯節(jié)點(diǎn)決定合適的并行度

我們引入了一個(gè)新組件 VertexParallelismDecider 來負(fù)責(zé)為邏輯節(jié)點(diǎn)計(jì)算并行度。計(jì)算算法如下:

假設(shè)

  1. V 是用戶配置的期望每個(gè)執(zhí)行節(jié)點(diǎn)處理的數(shù)據(jù)量;
  2. totalBytenon-broadcast 是邏輯節(jié)點(diǎn)需要處理的非廣播數(shù)據(jù)的總量;
  3. totalBytesbroadcast 是邏輯節(jié)點(diǎn)需要處理的廣播數(shù)據(jù)的總量;
  4. maxBroadcastRatio 是每個(gè)執(zhí)行節(jié)點(diǎn)處理的廣播數(shù)據(jù)的比例上限;
  5. normalize(x) 是一個(gè)輸出與 x 最接近的 2 的冪的函數(shù)。

計(jì)算并行度的公式如下:

值得注意的是,我們?cè)谶@個(gè)公式中引入了兩個(gè)特殊處理:

  1. 限制每個(gè)執(zhí)行節(jié)點(diǎn)處理的廣播數(shù)據(jù)的比例;
  2. 將并行度調(diào)整為 2 的冪。

此外,上述公式不能直接用來決定 source 節(jié)點(diǎn)的并行度,因?yàn)?source 節(jié)點(diǎn)不會(huì)消費(fèi)數(shù)據(jù)。為了解決這個(gè)問題,我們引入了配置選項(xiàng) jobmanager.adaptive-batch-scheduler.default-source-parallelism,允許用戶手動(dòng)配置 source 節(jié)點(diǎn)的并行度。請(qǐng)注意,并非所有 source 都需要此選項(xiàng),因?yàn)槟承?source 可以自己推導(dǎo)并行度(例如,HiveTableSource,詳情請(qǐng)參閱 HiveParallelismInference),對(duì)于這些source,更推薦由它們自己推導(dǎo)并行度。

3.2.1 限制每個(gè)執(zhí)行節(jié)點(diǎn)處理的廣播數(shù)據(jù)的比例

我們?cè)诠较拗泼總€(gè)執(zhí)行節(jié)點(diǎn)處理的廣播數(shù)據(jù)上限比例為 maxBroadcastRatio。 即每個(gè)執(zhí)行節(jié)點(diǎn)處理的非廣播數(shù)據(jù)至少為 (1-maxBroadcastRatio) * V。如果不這樣做,當(dāng)廣播數(shù)據(jù)的數(shù)據(jù)量接近 V 時(shí),即使非廣播數(shù)據(jù)的量非常小,也可能會(huì)被計(jì)算出很大的并行度,這是不必要的,會(huì)導(dǎo)致資源浪費(fèi)和任務(wù)部署的開銷變大。

通常情況下,一個(gè)執(zhí)行節(jié)點(diǎn)需要處理的廣播數(shù)據(jù)量會(huì)小于要處理的非廣播數(shù)據(jù)。 因此,我們將 maxBroadcastRatio 默認(rèn)設(shè)置為 0.5。目前,這個(gè)值是硬編碼在代碼中的,我們后續(xù)會(huì)考慮將其改為可配置的。

3.2.2 將并行度調(diào)整為 2 的冪

normalize 函數(shù)會(huì)將并行度調(diào)整為最近的 2 的冪,這樣做是為了避免引入數(shù)據(jù)傾斜。為了更好的理解本節(jié),我們建議您先閱讀子分區(qū)動(dòng)態(tài)映射部分。

以圖 4(b)為例,A1/A2 產(chǎn)生 4 個(gè)子分區(qū),B 最終被決定的并行度為 3。這種情況下,B1 將消費(fèi) 1 個(gè)子分區(qū),B2 將消費(fèi) 1 個(gè)子分區(qū),B3 將消費(fèi) 2 個(gè)子分區(qū)。我們假設(shè)不同子分區(qū)的數(shù)據(jù)量都相同,這樣 B3 需要消費(fèi)的數(shù)據(jù)量是 B1/B2 的 2 倍,從而導(dǎo)致了數(shù)據(jù)傾斜。

為了解決這個(gè)問題,我們需要讓所有下游執(zhí)行節(jié)點(diǎn)消費(fèi)的子分區(qū)數(shù)量都一樣,也就是說上游產(chǎn)出的子分區(qū)數(shù)量應(yīng)該是下游邏輯節(jié)點(diǎn)并行度的整數(shù)倍。為簡(jiǎn)單起見,我們希望用戶指定的最大并行度為 2^N(如果不是則會(huì)被自動(dòng)調(diào)整到不超過配置值的 2^N),然后將下游邏輯節(jié)點(diǎn)的并行度調(diào)整到最接近的 2^M(M <= N),這樣就可以保證子分區(qū)被下游均勻消費(fèi)。

不過這只是一個(gè)臨時(shí)的解決方案,最終應(yīng)該通過自動(dòng)負(fù)載均衡來解決,我們將在后續(xù)版本中實(shí)現(xiàn)。

3.3 動(dòng)態(tài)構(gòu)建執(zhí)行拓?fù)?/h3>

在引入自適應(yīng)批作業(yè)調(diào)度器之前,執(zhí)行拓?fù)涫且造o態(tài)方式構(gòu)建的,也就是在調(diào)度開始前執(zhí)行拓?fù)渚捅煌耆珓?chuàng)建出來了。為了使邏輯節(jié)點(diǎn)并行度可以在運(yùn)行時(shí)決定,執(zhí)行拓?fù)湫枰С謩?dòng)態(tài)構(gòu)建。

3.3.1 向執(zhí)行拓?fù)鋭?dòng)態(tài)添加節(jié)點(diǎn)和邊

動(dòng)態(tài)構(gòu)建執(zhí)行拓?fù)涫侵敢粋€(gè) Flink 作業(yè)從一個(gè)空的執(zhí)行拓?fù)溟_始,然后隨著調(diào)度逐步附加執(zhí)行節(jié)點(diǎn),如圖 2 所示。

執(zhí)行拓?fù)溆蓤?zhí)行節(jié)點(diǎn)和執(zhí)行邊(ExecutionEdge)組成。只有在以下情況下,才會(huì)將邏輯節(jié)點(diǎn)展開創(chuàng)建執(zhí)行節(jié)點(diǎn)并將其添加到執(zhí)行拓?fù)洌?/p>

  1. 對(duì)應(yīng)邏輯節(jié)點(diǎn)的并行度已經(jīng)被確定(以便 Flink 知道應(yīng)該創(chuàng)建多少個(gè)執(zhí)行節(jié)點(diǎn));
  2. 所有上游邏輯節(jié)點(diǎn)都已經(jīng)被展開(以便 Flink 通過執(zhí)行邊將新創(chuàng)建的執(zhí)行節(jié)點(diǎn)和上游執(zhí)行節(jié)點(diǎn)連接起來)。

圖 2 - 動(dòng)態(tài)構(gòu)建執(zhí)行拓?fù)?/p>

3.3.2 子分區(qū)動(dòng)態(tài)映射

在引入自適應(yīng)批作業(yè)調(diào)度器之前,在部署執(zhí)行節(jié)點(diǎn)時(shí),F(xiàn)link 需要知道其下游邏輯節(jié)點(diǎn)的并行度。因?yàn)橄掠芜壿嫻?jié)點(diǎn)的并行度決定了上游執(zhí)行節(jié)點(diǎn)需要產(chǎn)出的子分區(qū)數(shù)量。以圖 3 為例,下游 B 的并行度為 2,因此上游的 A1/A2 需要產(chǎn)生 2 個(gè)子分區(qū),索引為 0 的子分區(qū)被 B1 消費(fèi),索引為 1 的子分區(qū)被 B2 消費(fèi)。

圖 3 - 靜態(tài)執(zhí)行拓?fù)湎M(fèi)子分區(qū)的方式

但顯然,這不適用于動(dòng)態(tài)圖,因?yàn)楫?dāng)部署上游執(zhí)行節(jié)點(diǎn)時(shí),下游邏輯節(jié)點(diǎn)的并行度可能尚未確定(即部署 A1/A2 時(shí),B 的并行度還未確定)。為了解決這個(gè)問題,我們需要使上游執(zhí)行節(jié)點(diǎn)產(chǎn)生的子分區(qū)數(shù)量與下游邏輯節(jié)點(diǎn)的并行度解耦。

我們通過以下方法實(shí)現(xiàn)解耦:將上游執(zhí)行節(jié)點(diǎn)產(chǎn)生子分區(qū)的數(shù)量設(shè)置為下游邏輯節(jié)點(diǎn)的最大并行度(最大并行度是一個(gè)可配置的固定值),然后在下游邏輯節(jié)點(diǎn)并行度被確定后,將這些子分區(qū)均分給不同的下游執(zhí)行節(jié)點(diǎn)進(jìn)行消費(fèi)。也就是說,部署下游執(zhí)行節(jié)點(diǎn)時(shí),每個(gè)下游執(zhí)行節(jié)點(diǎn)都會(huì)被分配到一個(gè)子分區(qū)范圍來消費(fèi)。假設(shè) N 是下游邏輯節(jié)點(diǎn)并行度,P 是子分區(qū)的數(shù)量。對(duì)于第 k 個(gè)下游執(zhí)行節(jié)點(diǎn),消費(fèi)的子分區(qū)范圍應(yīng)該是:

以圖 4 為例,B 的最大并行度為 4,因此 A1/A2 有 4 個(gè)子分區(qū)。然后如果B的確定并行度為 2,則子分區(qū)映射將為圖 4(a),如果B的確定并行度為 3,則子分區(qū)映射將為圖 4(b)。

圖 4 - 動(dòng)態(tài)執(zhí)行拓?fù)湎M(fèi)子分區(qū)的方式

3.4 動(dòng)態(tài)更新并調(diào)度執(zhí)行拓?fù)?/h3>

自適應(yīng)批作業(yè)調(diào)度器調(diào)度作業(yè)的方式和默認(rèn)調(diào)度器基本相同,唯一的區(qū)別是:自適應(yīng)批作業(yè)調(diào)度器是從一個(gè)空的執(zhí)行拓?fù)溟_始調(diào)度,在處理任何調(diào)度事件之前,都會(huì)嘗試決定所有邏輯節(jié)點(diǎn)的并行度,然后嘗試為邏輯節(jié)點(diǎn)生成對(duì)應(yīng)的執(zhí)行節(jié)點(diǎn),并通過執(zhí)行邊連接上游節(jié)點(diǎn),更新執(zhí)行拓?fù)洹?/p>

調(diào)度器會(huì)在每次調(diào)度之前嘗試按照拓?fù)漤樞驔Q定所有邏輯節(jié)點(diǎn)的并行度:

  1. 對(duì)于 source 節(jié)點(diǎn),其并行度會(huì)在開始調(diào)度之前就進(jìn)行確定;
  2. 對(duì)于非 source 節(jié)點(diǎn),需要在其所有上游節(jié)點(diǎn)數(shù)據(jù)產(chǎn)出完成后才能確定其并行度。

然后,調(diào)度程序?qū)L試按照拓?fù)漤樞驅(qū)⑦壿嫻?jié)點(diǎn)展開生成執(zhí)行節(jié)點(diǎn)。一個(gè)可以被展開的邏輯節(jié)點(diǎn)應(yīng)該滿足以下條件:

  1. 該邏輯節(jié)點(diǎn)并行度已確定;
  2. 所有上游邏輯節(jié)點(diǎn)都已經(jīng)被展開。

04未來展望 - 自動(dòng)負(fù)載均衡

運(yùn)行批作業(yè)時(shí),可能會(huì)出現(xiàn)數(shù)據(jù)傾斜(某個(gè)執(zhí)行節(jié)點(diǎn)需要處理的數(shù)據(jù)遠(yuǎn)多于其他執(zhí)行節(jié)點(diǎn)),這會(huì)導(dǎo)作業(yè)出現(xiàn)長(zhǎng)尾現(xiàn)象,拖慢作業(yè)的完成速度。如果 Flink 可以自動(dòng)改善或者解決這個(gè)問題,可以給用戶很大的幫助。

一種典型的數(shù)據(jù)傾斜情況是某些子分區(qū)的數(shù)據(jù)量明顯大于其他子分區(qū)。這種情況可以通過劃分更細(xì)粒度的子分區(qū),并根據(jù)子分區(qū)大小來平衡工作負(fù)載來解決(如圖 5)。自適應(yīng)批作業(yè)調(diào)度器的工作可以被認(rèn)為是邁向它的第一步,因?yàn)樽詣?dòng)重新平衡的要求類似于自適應(yīng)批作業(yè)調(diào)度器,它們都需要?jiǎng)討B(tài)圖的支持和結(jié)果分區(qū)大小的采集。

基于自適應(yīng)批作業(yè)調(diào)度器的實(shí)現(xiàn),我們可以通過增加最大并行度(為了更細(xì)粒度的子分區(qū))和簡(jiǎn)單地更改子分區(qū)范圍劃分算法(為了平衡工作負(fù)載)來解決上述問題。在目前的設(shè)計(jì)中,子分區(qū)范圍是按照子分區(qū)的個(gè)數(shù)來劃分的,我們可以改成按照子分區(qū)中的數(shù)據(jù)量來劃分,這樣每個(gè)子分區(qū)范圍內(nèi)的數(shù)據(jù)量可以大致相同,從而平衡下游執(zhí)行節(jié)點(diǎn)的工作量。

圖 5 - 自動(dòng)負(fù)載均衡

注釋

[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/elastic_scaling/#adaptive-batch-scheduler

[2] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java

[3] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java

[4] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java

[5] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java

[6] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/internals/job_scheduling/#jobmanager-數(shù)據(jù)結(jié)構(gòu)

[7] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismDecider.java?


當(dāng)前標(biāo)題:自適應(yīng)批作業(yè)調(diào)度器:為Flink批作業(yè)自動(dòng)推導(dǎo)并行度
鏈接URL:http://www.5511xx.com/article/dhddspg.html