新聞中心
實(shí)際問題

專注于為中小企業(yè)提供成都做網(wǎng)站、網(wǎng)站制作服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)潁東免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了上千家企業(yè)的穩(wěn)健成長(zhǎng),幫助中小企業(yè)通過(guò)網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。
在流計(jì)算場(chǎng)景中,數(shù)據(jù)會(huì)源源不斷的流入Apache Flink系統(tǒng),每條數(shù)據(jù)進(jìn)入Apache Flink系統(tǒng)都會(huì)觸發(fā)計(jì)算。如果我們想進(jìn)行一個(gè)Count聚合計(jì)算,那么每次觸發(fā)計(jì)算是將歷史上所有流入的數(shù)據(jù)重新新計(jì)算一次,還是每次計(jì)算都是在上一次計(jì)算結(jié)果之上進(jìn)行增量計(jì)算呢?答案是肯定的,Apache Flink是基于上一次的計(jì)算結(jié)果進(jìn)行增量計(jì)算的。那么問題來(lái)了: "上一次的計(jì)算結(jié)果保存在哪里,保存在內(nèi)存可以嗎?",答案是否定的,如果保存在內(nèi)存,在由于網(wǎng)絡(luò),硬件等原因造成某個(gè)計(jì)算節(jié)點(diǎn)失敗的情況下,上一次計(jì)算結(jié)果會(huì)丟失,在節(jié)點(diǎn)恢復(fù)的時(shí)候,就需要將歷史上所有數(shù)據(jù)(可能十幾天,上百天的數(shù)據(jù))重新計(jì)算一次,所以為了避免這種災(zāi)難性的問題發(fā)生,Apache Flink 會(huì)利用State存儲(chǔ)計(jì)算結(jié)果。本篇將會(huì)為大家介紹Apache Flink State的相關(guān)內(nèi)容。
什么是State
這個(gè)問題似乎有些"***"?不管問題的答案是否顯而易見,但我還是想簡(jiǎn)單說(shuō)一下在Apache Flink里面什么是State?State是指流計(jì)算過(guò)程中計(jì)算節(jié)點(diǎn)的中間計(jì)算結(jié)果或元數(shù)據(jù)屬性,比如 在aggregation過(guò)程中要在state中記錄中間聚合結(jié)果,比如 Apache Kafka 作為數(shù)據(jù)源時(shí)候,我們也要記錄已經(jīng)讀取記錄的offset,這些State數(shù)據(jù)在計(jì)算過(guò)程中會(huì)進(jìn)行持久化(插入或更新)。所以Apache Flink中的State就是與時(shí)間相關(guān)的,Apache Flink任務(wù)的內(nèi)部數(shù)據(jù)(計(jì)算數(shù)據(jù)和元數(shù)據(jù)屬性)的快照。
為什么需要State
與批計(jì)算相比,State是流計(jì)算特有的,批計(jì)算沒有failover機(jī)制,要么成功,要么重新計(jì)算。流計(jì)算在 大多數(shù)場(chǎng)景 下是增量計(jì)算,數(shù)據(jù)逐條處理(大多數(shù)場(chǎng)景),每次計(jì)算是在上一次計(jì)算結(jié)果之上進(jìn)行處理的,這樣的機(jī)制勢(shì)必要將上一次的計(jì)算結(jié)果進(jìn)行存儲(chǔ)(生產(chǎn)模式要持久化),另外由于 機(jī)器,網(wǎng)絡(luò),臟數(shù)據(jù)等原因?qū)е碌某绦蝈e(cuò)誤,在重啟job時(shí)候需要從成功的檢查點(diǎn)(checkpoint,后面篇章會(huì)專門介紹)進(jìn)行state的恢復(fù)。增量計(jì)算,F(xiàn)ailover這些機(jī)制都需要state的支撐。
State 實(shí)現(xiàn)
Apache Flink內(nèi)部有四種state的存儲(chǔ)實(shí)現(xiàn),具體如下:
- 基于內(nèi)存的HeapStateBackend - 在debug模式使用,不 建議在生產(chǎn)模式下應(yīng)用;
- 基于HDFS的FsStateBackend - 分布式文件持久化,每次讀寫都產(chǎn)生網(wǎng)絡(luò)IO,整體性能不佳;
- 基于RocksDB的RocksDBStateBackend - 本地文件+異步HDFS持久化;
- 還有一個(gè)是基于Niagara(Alibaba對(duì) Apache Flink的增強(qiáng))NiagaraStateBackend - 分布式持久化- 在Alibaba生產(chǎn)環(huán)境應(yīng)用;
State 持久化邏輯
Apache Flink版本選擇用RocksDB+HDFS的方式進(jìn)行State的存儲(chǔ),State存儲(chǔ)分兩個(gè)階段,首先本地存儲(chǔ)到RocksDB,然后異步的同步到遠(yuǎn)程的HDFS。 這樣而設(shè)計(jì)既消除了HeapStateBackend的局限(內(nèi)存大小,機(jī)器壞掉丟失等),也減少了純分布式存儲(chǔ)的網(wǎng)絡(luò)IO開銷。
State 分類
Apache Flink 內(nèi)部按照算子和數(shù)據(jù)分組角度將State劃分為如下兩類:
- KeyedState - 這里面的key是我們?cè)赟QL語(yǔ)句中對(duì)應(yīng)的GroupBy/PartitioneBy里面的字段,key的值就是groupby/PartitionBy字段組成的Row的字節(jié)數(shù)組,每一個(gè)key都有一個(gè)屬于自己的State,key與key之間的State是不可見的;
- OperatorState - Apache Flink內(nèi)部的Source Connector的實(shí)現(xiàn)中就會(huì)用OperatorState來(lái)記錄source數(shù)據(jù)讀取的offset。
State 擴(kuò)容重新分配
Apache Flink是一個(gè)大規(guī)模并行分布式系統(tǒng),允許大規(guī)模的有狀態(tài)流處理。 為了可伸縮性,Apache Flink作業(yè)在邏輯上被分解成operator graph,并且每個(gè)operator的執(zhí)行被物理地分解成多個(gè)并行運(yùn)算符實(shí)例。 從概念上講,Apache Flink中的每個(gè)并行運(yùn)算符實(shí)例都是一個(gè)獨(dú)立的任務(wù),可以在自己的機(jī)器上調(diào)度到網(wǎng)絡(luò)連接的其他機(jī)器運(yùn)行。
Apache Flink的DAG圖中只有邊相連的節(jié)點(diǎn)網(wǎng)絡(luò)通信,也就是整個(gè)DAG在垂直方向有網(wǎng)絡(luò)IO,在水平方向如下圖的stateful節(jié)點(diǎn)之間沒有網(wǎng)絡(luò)通信,這種模型也保證了每個(gè)operator實(shí)例維護(hù)一份自己的state,并且保存在本地磁盤(遠(yuǎn)程異步同步)。通過(guò)這種設(shè)計(jì),任務(wù)的所有狀態(tài)數(shù)據(jù)都是本地的,并且狀態(tài)訪問不需要任務(wù)之間的網(wǎng)絡(luò)通信。 避免這種流量對(duì)于像Apache Flink這樣的大規(guī)模并行分布式系統(tǒng)的可擴(kuò)展性至關(guān)重要。
如上我們知道Apache Flink中State有OperatorState和KeyedState,那么在進(jìn)行擴(kuò)容時(shí)候(增加并發(fā))State如何分配呢?比如:外部Source有5個(gè)partition,在Apache Flink上面由Srouce的1個(gè)并發(fā)擴(kuò)容到2個(gè)并發(fā),中間Stateful Operation 節(jié)點(diǎn)由2個(gè)并發(fā)并擴(kuò)容的3個(gè)并發(fā),如下圖所示:
在Apache Flink中對(duì)不同類型的State有不同的擴(kuò)容方法,接下來(lái)我們分別介紹。
OperatorState對(duì)擴(kuò)容的處理
我們選取Apache Flink中某個(gè)具體Connector實(shí)現(xiàn)實(shí)例進(jìn)行介紹,以MetaQ為例,MetaQ以topic方式訂閱數(shù)據(jù),每個(gè)topic會(huì)有N>0個(gè)分區(qū),以上圖為例,加上我們訂閱的MetaQ的topic有5個(gè)分區(qū),那么當(dāng)我們source由1個(gè)并發(fā)調(diào)整為2個(gè)并發(fā)時(shí)候,State是怎么恢復(fù)的呢?
state 恢復(fù)的方式與Source中OperatorState的存儲(chǔ)結(jié)構(gòu)有必然關(guān)系,我們先看MetaQSource的實(shí)現(xiàn)是如何存儲(chǔ)State的。首先MetaQSource 實(shí)現(xiàn)了ListCheckpointed ,其中的T是Tuple2
- public interface ListCheckpointed
{ - List
snapshotState(long var1, long var3) throws Exception; - void restoreState(List
var1) throws Exception;}
我們發(fā)現(xiàn) snapshotState方法的返回值是一個(gè)List ,T是Tuple2
- public interface InputSplit extends Serializable {
- int getSplitNumber();
- }
也就是說(shuō),InputSplit我們可以理解為是一個(gè)Partition索引,有了這個(gè)數(shù)據(jù)結(jié)構(gòu)我們?cè)诳纯瓷厦鎴D所示的case是如何工作的?當(dāng)Source的并行度是1的時(shí)候,所有打partition數(shù)據(jù)都在同一個(gè)線程中讀取,所有partition的state也在同一個(gè)state中維護(hù),State存儲(chǔ)信息格式如下:
如果我們現(xiàn)在將并發(fā)調(diào)整為2,那么我們5個(gè)分區(qū)的State將會(huì)在2個(gè)獨(dú)立的任務(wù)(線程)中進(jìn)行維護(hù),在內(nèi)部實(shí)現(xiàn)中我們有如下算法進(jìn)行分配每個(gè)Task所處理和維護(hù)partition的State信息,如下:
- List
assignedPartitions = new LinkedList<>(); - for (int i = 0; i < partitions; i++) {
- if (i % consumerCount == consumerIndex) {
- assignedPartitions.add(i);
- }
- }
這個(gè)求mod的算法,決定了每個(gè)并發(fā)所處理和維護(hù)partition的State信息,針對(duì)我們當(dāng)前的case具體的存儲(chǔ)情況如下:
那么到現(xiàn)在我們發(fā)現(xiàn)上面擴(kuò)容后State得以很好的分配得益于OperatorState采用了List 的數(shù)據(jù)結(jié)構(gòu)的設(shè)計(jì)。另外大家注意一個(gè)問題,相信大家已經(jīng)發(fā)現(xiàn)上面分配partition的算法有一個(gè)限制,那就是Source的擴(kuò)容(并發(fā)數(shù))是否可以超過(guò)Source物理存儲(chǔ)的partition數(shù)量呢?答案是否定的,不能。目前Apache Flink的做法是提前報(bào)錯(cuò),即使不報(bào)錯(cuò)也是資源的浪費(fèi),因?yàn)槌^(guò)partition數(shù)量的并發(fā)永遠(yuǎn)分配不到待管理的partition。
KeyedState對(duì)擴(kuò)容的處理
對(duì)于KeyedState最容易想到的是hash(key) mod parallelism(operator) 方式分配state,就和OperatorState一樣,這種分配方式大多數(shù)情況是恢復(fù)的state不是本地已有的state,需要一次網(wǎng)絡(luò)拷貝,這種效率比較低,OperatorState采用這種簡(jiǎn)單的方式進(jìn)行處理是因?yàn)镺peratorState的state一般都比較小,網(wǎng)絡(luò)拉取的成本很小,對(duì)于KeyedState往往很大,我們會(huì)有更好的選擇,在Apache Flink中采用的是Key-Groups方式進(jìn)行分配。
什么是Key-Groups
Key-Groups 是Apache Flink中對(duì)keyed state按照key進(jìn)行分組的方式,每個(gè)key-group中會(huì)包含N>0個(gè)key,一個(gè)key-group是State分配的原子單位。在Apache Flink中關(guān)于Key-Group的對(duì)象是 KeyGroupRange, 如下:
- public class KeyGroupRange implements KeyGroupsList, Serializable {
- ...
- ...
- private final int startKeyGroup;
- private final int endKeyGroup;
- ...
- ...}
KeyGroupRange兩個(gè)重要的屬性就是 startKeyGroup和endKeyGroup,定義了startKeyGroup和endKeyGroup屬性后Operator上面的Key-Group的個(gè)數(shù)也就確定了。
什么決定Key-Groups的個(gè)數(shù)
key-group的數(shù)量在job啟動(dòng)前必須是確定的且運(yùn)行中不能改變。由于key-group是state分配的原子單位,而每個(gè)operator并行實(shí)例至少包含一個(gè)key-group,因此operator的***并行度不能超過(guò)設(shè)定的key-group的個(gè)數(shù),那么在Apache Flink的內(nèi)部實(shí)現(xiàn)上key-group的數(shù)量就是***并行度的值。
GroupRange.of(0, maxParallelism)如何決定key屬于哪個(gè)Key-Group
確定好GroupRange之后,如何決定每個(gè)Key屬于哪個(gè)Key-Group呢?我們采取的是取mod的方式,在KeyGroupRangeAssignment中的assignToKeyGroup方法會(huì)將key劃分到指定的key-group中,如下:
- public static int assignToKeyGroup(Object key, int maxParallelism) {
- return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
- }
- public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
- return HashPartitioner.INSTANCE.partition(keyHash, maxParallelism);
- }
- @Override
- public int partition(T key, int numPartitions) {
- return MathUtils.murmurHash(Objects.hashCode(key)) % numPartitions;
- }
如上實(shí)現(xiàn)我們了解到分配Key到指定的key-group的邏輯是利用key的hashCode和maxParallelism進(jìn)行取余操作來(lái)分配的。如下圖當(dāng)parallelism=2,maxParallelism=10的情況下流上key與key-group的對(duì)應(yīng)關(guān)系如下圖所示:
如上圖key(a)的hashCode是97,與***并發(fā)10取余后是7,被分配到了KG-7中,流上每個(gè)event都會(huì)分配到KG-0至KG-9其中一個(gè)Key-Group中。
每個(gè)Operator實(shí)例如何獲取Key-Groups
了解了Key-Groups概念和如何分配每個(gè)Key到指定的Key-Groups之后,我們看看如何計(jì)算每個(gè)Operator實(shí)例所處理的Key-Groups。 在KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法描述了分配算法:
- public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
- int maxParallelism,
- int parallelism,
- int operatorIndex) {
- GroupRange splitRange = GroupRange.of(0, maxParallelism).getSplitRange(parallelism, operatorIndex);
- int startGroup = splitRange.getStartGroup();
- int endGroup = splitRange.getEndGroup();
- return new KeyGroupRange(startGroup, endGroup - 1);
- }
- public GroupRange getSplitRange(int numSplits, int splitIndex) {
- ...
- final int numGroupsPerSplit = getNumGroups() / numSplits;
- final int numFatSplits = getNumGroups() % numSplits;
- int startGroupForThisSplit;
- int endGroupForThisSplit;
- if (splitIndex < numFatSplits) {
- startGroupForThisSplit = getStartGroup() + splitIndex * (numGroupsPerSplit + 1);
- endGroupForThisSplit = startGroupForThisSplit + numGroupsPerSplit + 1;
- } else {
- startGroupForThisSplit = getStartGroup() + splitIndex * numGroupsPerSplit + numFatSplits;
- endGroupForThisSplit = startGroupForThisSplit + numGroupsPerSplit;
- }
- if (startGroupForThisSplit >= endGroupForThisSplit) {
- return GroupRange.emptyGroupRange();
- } else {
- return new GroupRange(startGroupForThisSplit, endGroupForThisSplit);
- }}
上面代碼的核心邏輯是先計(jì)算每個(gè)Operator實(shí)例至少分配的Key-Group個(gè)數(shù),將不能整除的部分N個(gè),平均分給前N個(gè)實(shí)例。最終每個(gè)Operator實(shí)例管理的Key-Groups會(huì)在GroupRange中表示,本質(zhì)是一個(gè)區(qū)間值;下面我們就上圖的case,說(shuō)明一下如何進(jìn)行分配以及擴(kuò)容后如何重新分配。
假設(shè)上面的Stateful Operation節(jié)點(diǎn)的***并行度maxParallelism的值是10,也就是我們一共有10個(gè)Key-Group,當(dāng)我們并發(fā)是2的時(shí)候和并發(fā)是3的時(shí)候分配的情況如下圖:
如上算法我們發(fā)現(xiàn)在進(jìn)行擴(kuò)容時(shí)候,大部分state還是落到本地的,如Task0只有KG-4被分出去,其他的還是保持在本地。同時(shí)我們也發(fā)現(xiàn),一個(gè)job如果修改了maxParallelism的值那么會(huì)直接影響到Key-Groups的數(shù)量和key的分配,也會(huì)打亂所有的Key-Group的分配,目前在Apache Flink系統(tǒng)中統(tǒng)一將maxParallelism的默認(rèn)值調(diào)整到4096,***程度的避免無(wú)法擴(kuò)容的情況發(fā)生。
小結(jié)
本篇簡(jiǎn)單介紹了Apache Flink中State的概念,并重點(diǎn)介紹了OperatorState和KeyedState在擴(kuò)容時(shí)候的處理方式。Apache Flink State是支撐Apache Flink中failover,增量計(jì)算,Window等重要機(jī)制和功能的核心設(shè)施。后續(xù)介紹failover,增量計(jì)算,Window等相關(guān)篇章中也會(huì)涉及State的利用,當(dāng)涉及到本篇沒有覆蓋的內(nèi)容時(shí)候再補(bǔ)充介紹。同時(shí)本篇沒有介紹Alibaba對(duì)Apache Flink的增強(qiáng)的Niagara版本的State。Niagara是Alibaba精心打造的新一代適用于流計(jì)算場(chǎng)景的StateBackend存儲(chǔ)實(shí)現(xiàn),相關(guān)內(nèi)容后續(xù)在合適時(shí)間再向大家介紹。
關(guān)于點(diǎn)贊和評(píng)論
本系列文章難免有很多缺陷和不足,真誠(chéng)希望讀者對(duì)有收獲的篇章給予點(diǎn)贊鼓勵(lì),對(duì)有不足的篇章給予反饋和建議,先行感謝大家!
作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來(lái)一直投入于基于Apache Flink的阿里巴巴計(jì)算平臺(tái)Blink的設(shè)計(jì)研發(fā)工作。
【本文為專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)聯(lián)系原作者】
戳這里,看該作者更多好文
當(dāng)前文章:Apache Flink 漫談系列(04) - State
標(biāo)題鏈接:http://www.5511xx.com/article/djpipph.html


咨詢
建站咨詢
