日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
Apache Flink 漫談系列(04) - State

實(shí)際問題

專注于為中小企業(yè)提供成都做網(wǎng)站、網(wǎng)站制作服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)潁東免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上千家企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過(guò)網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

在流計(jì)算場(chǎng)景中,數(shù)據(jù)會(huì)源源不斷的流入Apache Flink系統(tǒng),每條數(shù)據(jù)進(jìn)入Apache Flink系統(tǒng)都會(huì)觸發(fā)計(jì)算。如果我們想進(jìn)行一個(gè)Count聚合計(jì)算,那么每次觸發(fā)計(jì)算是將歷史上所有流入的數(shù)據(jù)重新新計(jì)算一次,還是每次計(jì)算都是在上一次計(jì)算結(jié)果之上進(jìn)行增量計(jì)算呢?答案是肯定的,Apache Flink是基于上一次的計(jì)算結(jié)果進(jìn)行增量計(jì)算的。那么問題來(lái)了: "上一次的計(jì)算結(jié)果保存在哪里,保存在內(nèi)存可以嗎?",答案是否定的,如果保存在內(nèi)存,在由于網(wǎng)絡(luò),硬件等原因造成某個(gè)計(jì)算節(jié)點(diǎn)失敗的情況下,上一次計(jì)算結(jié)果會(huì)丟失,在節(jié)點(diǎn)恢復(fù)的時(shí)候,就需要將歷史上所有數(shù)據(jù)(可能十幾天,上百天的數(shù)據(jù))重新計(jì)算一次,所以為了避免這種災(zāi)難性的問題發(fā)生,Apache Flink 會(huì)利用State存儲(chǔ)計(jì)算結(jié)果。本篇將會(huì)為大家介紹Apache Flink State的相關(guān)內(nèi)容。

什么是State

這個(gè)問題似乎有些"***"?不管問題的答案是否顯而易見,但我還是想簡(jiǎn)單說(shuō)一下在Apache Flink里面什么是State?State是指流計(jì)算過(guò)程中計(jì)算節(jié)點(diǎn)的中間計(jì)算結(jié)果或元數(shù)據(jù)屬性,比如 在aggregation過(guò)程中要在state中記錄中間聚合結(jié)果,比如 Apache Kafka 作為數(shù)據(jù)源時(shí)候,我們也要記錄已經(jīng)讀取記錄的offset,這些State數(shù)據(jù)在計(jì)算過(guò)程中會(huì)進(jìn)行持久化(插入或更新)。所以Apache Flink中的State就是與時(shí)間相關(guān)的,Apache Flink任務(wù)的內(nèi)部數(shù)據(jù)(計(jì)算數(shù)據(jù)和元數(shù)據(jù)屬性)的快照。

為什么需要State

與批計(jì)算相比,State是流計(jì)算特有的,批計(jì)算沒有failover機(jī)制,要么成功,要么重新計(jì)算。流計(jì)算在 大多數(shù)場(chǎng)景 下是增量計(jì)算,數(shù)據(jù)逐條處理(大多數(shù)場(chǎng)景),每次計(jì)算是在上一次計(jì)算結(jié)果之上進(jìn)行處理的,這樣的機(jī)制勢(shì)必要將上一次的計(jì)算結(jié)果進(jìn)行存儲(chǔ)(生產(chǎn)模式要持久化),另外由于 機(jī)器,網(wǎng)絡(luò),臟數(shù)據(jù)等原因?qū)е碌某绦蝈e(cuò)誤,在重啟job時(shí)候需要從成功的檢查點(diǎn)(checkpoint,后面篇章會(huì)專門介紹)進(jìn)行state的恢復(fù)。增量計(jì)算,F(xiàn)ailover這些機(jī)制都需要state的支撐。

State 實(shí)現(xiàn)

Apache Flink內(nèi)部有四種state的存儲(chǔ)實(shí)現(xiàn),具體如下:

  • 基于內(nèi)存的HeapStateBackend - 在debug模式使用,不 建議在生產(chǎn)模式下應(yīng)用;
  • 基于HDFS的FsStateBackend - 分布式文件持久化,每次讀寫都產(chǎn)生網(wǎng)絡(luò)IO,整體性能不佳;
  • 基于RocksDB的RocksDBStateBackend - 本地文件+異步HDFS持久化;
  • 還有一個(gè)是基于Niagara(Alibaba對(duì) Apache Flink的增強(qiáng))NiagaraStateBackend - 分布式持久化- 在Alibaba生產(chǎn)環(huán)境應(yīng)用;

State 持久化邏輯

Apache Flink版本選擇用RocksDB+HDFS的方式進(jìn)行State的存儲(chǔ),State存儲(chǔ)分兩個(gè)階段,首先本地存儲(chǔ)到RocksDB,然后異步的同步到遠(yuǎn)程的HDFS。 這樣而設(shè)計(jì)既消除了HeapStateBackend的局限(內(nèi)存大小,機(jī)器壞掉丟失等),也減少了純分布式存儲(chǔ)的網(wǎng)絡(luò)IO開銷。

State 分類

Apache Flink 內(nèi)部按照算子和數(shù)據(jù)分組角度將State劃分為如下兩類:

  • KeyedState - 這里面的key是我們?cè)赟QL語(yǔ)句中對(duì)應(yīng)的GroupBy/PartitioneBy里面的字段,key的值就是groupby/PartitionBy字段組成的Row的字節(jié)數(shù)組,每一個(gè)key都有一個(gè)屬于自己的State,key與key之間的State是不可見的;
  • OperatorState - Apache Flink內(nèi)部的Source Connector的實(shí)現(xiàn)中就會(huì)用OperatorState來(lái)記錄source數(shù)據(jù)讀取的offset。

State 擴(kuò)容重新分配

Apache Flink是一個(gè)大規(guī)模并行分布式系統(tǒng),允許大規(guī)模的有狀態(tài)流處理。 為了可伸縮性,Apache Flink作業(yè)在邏輯上被分解成operator graph,并且每個(gè)operator的執(zhí)行被物理地分解成多個(gè)并行運(yùn)算符實(shí)例。 從概念上講,Apache Flink中的每個(gè)并行運(yùn)算符實(shí)例都是一個(gè)獨(dú)立的任務(wù),可以在自己的機(jī)器上調(diào)度到網(wǎng)絡(luò)連接的其他機(jī)器運(yùn)行。

Apache Flink的DAG圖中只有邊相連的節(jié)點(diǎn)網(wǎng)絡(luò)通信,也就是整個(gè)DAG在垂直方向有網(wǎng)絡(luò)IO,在水平方向如下圖的stateful節(jié)點(diǎn)之間沒有網(wǎng)絡(luò)通信,這種模型也保證了每個(gè)operator實(shí)例維護(hù)一份自己的state,并且保存在本地磁盤(遠(yuǎn)程異步同步)。通過(guò)這種設(shè)計(jì),任務(wù)的所有狀態(tài)數(shù)據(jù)都是本地的,并且狀態(tài)訪問不需要任務(wù)之間的網(wǎng)絡(luò)通信。 避免這種流量對(duì)于像Apache Flink這樣的大規(guī)模并行分布式系統(tǒng)的可擴(kuò)展性至關(guān)重要。

如上我們知道Apache Flink中State有OperatorState和KeyedState,那么在進(jìn)行擴(kuò)容時(shí)候(增加并發(fā))State如何分配呢?比如:外部Source有5個(gè)partition,在Apache Flink上面由Srouce的1個(gè)并發(fā)擴(kuò)容到2個(gè)并發(fā),中間Stateful Operation 節(jié)點(diǎn)由2個(gè)并發(fā)并擴(kuò)容的3個(gè)并發(fā),如下圖所示:

在Apache Flink中對(duì)不同類型的State有不同的擴(kuò)容方法,接下來(lái)我們分別介紹。

OperatorState對(duì)擴(kuò)容的處理

我們選取Apache Flink中某個(gè)具體Connector實(shí)現(xiàn)實(shí)例進(jìn)行介紹,以MetaQ為例,MetaQ以topic方式訂閱數(shù)據(jù),每個(gè)topic會(huì)有N>0個(gè)分區(qū),以上圖為例,加上我們訂閱的MetaQ的topic有5個(gè)分區(qū),那么當(dāng)我們source由1個(gè)并發(fā)調(diào)整為2個(gè)并發(fā)時(shí)候,State是怎么恢復(fù)的呢?

state 恢復(fù)的方式與Source中OperatorState的存儲(chǔ)結(jié)構(gòu)有必然關(guān)系,我們先看MetaQSource的實(shí)現(xiàn)是如何存儲(chǔ)State的。首先MetaQSource 實(shí)現(xiàn)了ListCheckpointed ,其中的T是Tuple2

 
 
 
 
  1. public interface ListCheckpointed { 
  2.     List snapshotState(long var1, long var3) throws Exception; 
  3.  
  4.     void restoreState(List var1) throws Exception;} 

我們發(fā)現(xiàn) snapshotState方法的返回值是一個(gè)List ,T是Tuple2

 
 
 
 
  1. public interface InputSplit extends Serializable { 
  2.     int getSplitNumber(); 

也就是說(shuō),InputSplit我們可以理解為是一個(gè)Partition索引,有了這個(gè)數(shù)據(jù)結(jié)構(gòu)我們?cè)诳纯瓷厦鎴D所示的case是如何工作的?當(dāng)Source的并行度是1的時(shí)候,所有打partition數(shù)據(jù)都在同一個(gè)線程中讀取,所有partition的state也在同一個(gè)state中維護(hù),State存儲(chǔ)信息格式如下:

如果我們現(xiàn)在將并發(fā)調(diào)整為2,那么我們5個(gè)分區(qū)的State將會(huì)在2個(gè)獨(dú)立的任務(wù)(線程)中進(jìn)行維護(hù),在內(nèi)部實(shí)現(xiàn)中我們有如下算法進(jìn)行分配每個(gè)Task所處理和維護(hù)partition的State信息,如下:

 
 
 
 
  1. List assignedPartitions = new LinkedList<>(); 
  2. for (int i = 0; i < partitions; i++) { 
  3.         if (i % consumerCount == consumerIndex) { 
  4.                 assignedPartitions.add(i); 
  5.         } 

這個(gè)求mod的算法,決定了每個(gè)并發(fā)所處理和維護(hù)partition的State信息,針對(duì)我們當(dāng)前的case具體的存儲(chǔ)情況如下:

那么到現(xiàn)在我們發(fā)現(xiàn)上面擴(kuò)容后State得以很好的分配得益于OperatorState采用了List 的數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)。另外大家注意一個(gè)問題,相信大家已經(jīng)發(fā)現(xiàn)上面分配partition的算法有一個(gè)限制,那就是Source的擴(kuò)容(并發(fā)數(shù))是否可以超過(guò)Source物理存儲(chǔ)的partition數(shù)量呢?答案是否定的,不能。目前Apache Flink的做法是提前報(bào)錯(cuò),即使不報(bào)錯(cuò)也是資源的浪費(fèi),因?yàn)槌^(guò)partition數(shù)量的并發(fā)永遠(yuǎn)分配不到待管理的partition。

KeyedState對(duì)擴(kuò)容的處理

對(duì)于KeyedState最容易想到的是hash(key) mod parallelism(operator) 方式分配state,就和OperatorState一樣,這種分配方式大多數(shù)情況是恢復(fù)的state不是本地已有的state,需要一次網(wǎng)絡(luò)拷貝,這種效率比較低,OperatorState采用這種簡(jiǎn)單的方式進(jìn)行處理是因?yàn)镺peratorState的state一般都比較小,網(wǎng)絡(luò)拉取的成本很小,對(duì)于KeyedState往往很大,我們會(huì)有更好的選擇,在Apache Flink中采用的是Key-Groups方式進(jìn)行分配。

什么是Key-Groups

Key-Groups 是Apache Flink中對(duì)keyed state按照key進(jìn)行分組的方式,每個(gè)key-group中會(huì)包含N>0個(gè)key,一個(gè)key-group是State分配的原子單位。在Apache Flink中關(guān)于Key-Group的對(duì)象是 KeyGroupRange, 如下:

 
 
 
 
  1. public class KeyGroupRange implements KeyGroupsList, Serializable { 
  2.         ... 
  3.         ... 
  4.         private final int startKeyGroup; 
  5.         private final int endKeyGroup; 
  6.         ... 
  7.         ...} 

KeyGroupRange兩個(gè)重要的屬性就是 startKeyGroup和endKeyGroup,定義了startKeyGroup和endKeyGroup屬性后Operator上面的Key-Group的個(gè)數(shù)也就確定了。

什么決定Key-Groups的個(gè)數(shù)

key-group的數(shù)量在job啟動(dòng)前必須是確定的且運(yùn)行中不能改變。由于key-group是state分配的原子單位,而每個(gè)operator并行實(shí)例至少包含一個(gè)key-group,因此operator的***并行度不能超過(guò)設(shè)定的key-group的個(gè)數(shù),那么在Apache Flink的內(nèi)部實(shí)現(xiàn)上key-group的數(shù)量就是***并行度的值。

GroupRange.of(0, maxParallelism)如何決定key屬于哪個(gè)Key-Group

確定好GroupRange之后,如何決定每個(gè)Key屬于哪個(gè)Key-Group呢?我們采取的是取mod的方式,在KeyGroupRangeAssignment中的assignToKeyGroup方法會(huì)將key劃分到指定的key-group中,如下:

 
 
 
 
  1. public static int assignToKeyGroup(Object key, int maxParallelism) { 
  2.       return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); 
  3.  
  4. public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { 
  5.       return HashPartitioner.INSTANCE.partition(keyHash, maxParallelism); 
  6.  
  7. @Override 
  8. public int partition(T key, int numPartitions) { 
  9.       return MathUtils.murmurHash(Objects.hashCode(key)) % numPartitions; 

如上實(shí)現(xiàn)我們了解到分配Key到指定的key-group的邏輯是利用key的hashCode和maxParallelism進(jìn)行取余操作來(lái)分配的。如下圖當(dāng)parallelism=2,maxParallelism=10的情況下流上key與key-group的對(duì)應(yīng)關(guān)系如下圖所示:

如上圖key(a)的hashCode是97,與***并發(fā)10取余后是7,被分配到了KG-7中,流上每個(gè)event都會(huì)分配到KG-0至KG-9其中一個(gè)Key-Group中。

每個(gè)Operator實(shí)例如何獲取Key-Groups

了解了Key-Groups概念和如何分配每個(gè)Key到指定的Key-Groups之后,我們看看如何計(jì)算每個(gè)Operator實(shí)例所處理的Key-Groups。 在KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法描述了分配算法:

 
 
 
 
  1. public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( 
  2.       int maxParallelism, 
  3.       int parallelism, 
  4.       int operatorIndex) { 
  5.         GroupRange splitRange = GroupRange.of(0, maxParallelism).getSplitRange(parallelism, operatorIndex); 
  6.         int startGroup = splitRange.getStartGroup(); 
  7.         int endGroup = splitRange.getEndGroup(); 
  8.    return new KeyGroupRange(startGroup, endGroup - 1); 
  9.  
  10. public GroupRange getSplitRange(int numSplits, int splitIndex) { 
  11.         ... 
  12.         final int numGroupsPerSplit = getNumGroups() / numSplits; 
  13.         final int numFatSplits = getNumGroups() % numSplits; 
  14.  
  15.         int startGroupForThisSplit; 
  16.         int endGroupForThisSplit; 
  17.         if (splitIndex < numFatSplits) { 
  18.             startGroupForThisSplit = getStartGroup() + splitIndex * (numGroupsPerSplit + 1); 
  19.             endGroupForThisSplit =   startGroupForThisSplit + numGroupsPerSplit + 1; 
  20.         } else { 
  21.             startGroupForThisSplit = getStartGroup() + splitIndex * numGroupsPerSplit + numFatSplits; 
  22.             endGroupForThisSplit =  startGroupForThisSplit + numGroupsPerSplit; 
  23.         } 
  24.         if (startGroupForThisSplit >= endGroupForThisSplit) { 
  25.                 return GroupRange.emptyGroupRange(); 
  26.         } else { 
  27.                 return new GroupRange(startGroupForThisSplit, endGroupForThisSplit); 
  28.         }} 

上面代碼的核心邏輯是先計(jì)算每個(gè)Operator實(shí)例至少分配的Key-Group個(gè)數(shù),將不能整除的部分N個(gè),平均分給前N個(gè)實(shí)例。最終每個(gè)Operator實(shí)例管理的Key-Groups會(huì)在GroupRange中表示,本質(zhì)是一個(gè)區(qū)間值;下面我們就上圖的case,說(shuō)明一下如何進(jìn)行分配以及擴(kuò)容后如何重新分配。

假設(shè)上面的Stateful Operation節(jié)點(diǎn)的***并行度maxParallelism的值是10,也就是我們一共有10個(gè)Key-Group,當(dāng)我們并發(fā)是2的時(shí)候和并發(fā)是3的時(shí)候分配的情況如下圖:

如上算法我們發(fā)現(xiàn)在進(jìn)行擴(kuò)容時(shí)候,大部分state還是落到本地的,如Task0只有KG-4被分出去,其他的還是保持在本地。同時(shí)我們也發(fā)現(xiàn),一個(gè)job如果修改了maxParallelism的值那么會(huì)直接影響到Key-Groups的數(shù)量和key的分配,也會(huì)打亂所有的Key-Group的分配,目前在Apache Flink系統(tǒng)中統(tǒng)一將maxParallelism的默認(rèn)值調(diào)整到4096,***程度的避免無(wú)法擴(kuò)容的情況發(fā)生。

小結(jié)

本篇簡(jiǎn)單介紹了Apache Flink中State的概念,并重點(diǎn)介紹了OperatorState和KeyedState在擴(kuò)容時(shí)候的處理方式。Apache Flink State是支撐Apache Flink中failover,增量計(jì)算,Window等重要機(jī)制和功能的核心設(shè)施。后續(xù)介紹failover,增量計(jì)算,Window等相關(guān)篇章中也會(huì)涉及State的利用,當(dāng)涉及到本篇沒有覆蓋的內(nèi)容時(shí)候再補(bǔ)充介紹。同時(shí)本篇沒有介紹Alibaba對(duì)Apache Flink的增強(qiáng)的Niagara版本的State。Niagara是Alibaba精心打造的新一代適用于流計(jì)算場(chǎng)景的StateBackend存儲(chǔ)實(shí)現(xiàn),相關(guān)內(nèi)容后續(xù)在合適時(shí)間再向大家介紹。

關(guān)于點(diǎn)贊和評(píng)論

本系列文章難免有很多缺陷和不足,真誠(chéng)希望讀者對(duì)有收獲的篇章給予點(diǎn)贊鼓勵(lì),對(duì)有不足的篇章給予反饋和建議,先行感謝大家!

作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來(lái)一直投入于基于Apache Flink的阿里巴巴計(jì)算平臺(tái)Blink的設(shè)計(jì)研發(fā)工作。

【本文為專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)聯(lián)系原作者】

戳這里,看該作者更多好文


當(dāng)前文章:Apache Flink 漫談系列(04) - State
標(biāo)題鏈接:http://www.5511xx.com/article/djpipph.html