日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
FlinkCDCMongoDBConnector的實現(xiàn)原理和使用實踐

摘要:本文整理自 XTransfer 資深 Java 開發(fā)工程師、Flink CDC Maintainer 孫家寶在 Flink CDC Meetup 的演講。主要內(nèi)容包括:

公司主營業(yè)務(wù):成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)是一支青春激揚、勤奮敬業(yè)、活力青春激揚、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊。公司秉承以“開放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團(tuán)隊有機會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出南海免費做網(wǎng)站回饋大家。

  1. MongoDB Change Stream 技術(shù)簡介
  2. MongoDB CDC Connector 業(yè)務(wù)實踐
  3. MongoDB CDC Connector 生產(chǎn)調(diào)優(yōu)
  4. MongoDB CDC Connector 并行化 Snapshot 改進(jìn)
  5. 后續(xù)規(guī)劃

01MongoDB Change Stream 技術(shù)簡介

MongoDB 是一種面向文檔的非關(guān)系型數(shù)據(jù)庫,支持半結(jié)構(gòu)化數(shù)據(jù)存儲;也是一種分布式的數(shù)據(jù)庫,提供副本集和分片集兩種集群部署模式,具有高可用和水平擴展的能力,比較適合大規(guī)模的數(shù)據(jù)存儲。另外, MongoDB 4.0 版本還提供了多文檔事務(wù)的支持,對于一些比較復(fù)雜的業(yè)務(wù)場景更加友好。

MongoDB 使用了弱結(jié)構(gòu)化的存儲模式,支持靈活的數(shù)據(jù)結(jié)構(gòu)和豐富的數(shù)據(jù)類型,適合 Json 文檔、標(biāo)簽、快照、地理位置、內(nèi)容存儲等業(yè)務(wù)場景。它天然的分布式架構(gòu)提供了開箱即用的分片機制和自動 rebalance 能力,適合大規(guī)模數(shù)據(jù)存儲。另外, MongoDB 還提供了分布式網(wǎng)格文件存儲的功能,即 GridFS,適合圖片、音頻、視頻等大文件存儲。

MongoDB 提供了副本集和分片集兩種集群模部署模式。

副本集:高可用的部署模式,次要節(jié)點通過拷貝主要節(jié)點的操作日志來進(jìn)行數(shù)據(jù)的復(fù)制。當(dāng)主要節(jié)點發(fā)生故障時,次要節(jié)點和仲裁節(jié)點會重新發(fā)起投票來選出新的主要節(jié)點,實現(xiàn)故障轉(zhuǎn)移。另外,次要節(jié)點還能分擔(dān)查詢請求,減輕主要節(jié)點的查詢壓力。

分片集:水平擴展的部署模式,將數(shù)據(jù)均勻分散在不同 Shard 上,每個 Shard 可以部署為一個副本集,Shard 中主要節(jié)點承載讀寫請求,次要節(jié)點會復(fù)制主要節(jié)點的操作日志,能夠根據(jù)指定的分片索引和分片策略將數(shù)據(jù)切分成多個 16MB 的數(shù)據(jù)塊,并將這些數(shù)據(jù)塊交給不同 Shard 進(jìn)行存儲。Config Servers 中會記錄 Shard 和數(shù)據(jù)塊的對應(yīng)關(guān)系。

MongoDB 的 Oplog 與 MySQL 的 Binlog 類似,記錄了數(shù)據(jù)在 MongoDB 中所有的操作日志。Oplog 是一個有容量的集合,如果超出預(yù)設(shè)的容量范圍,則會丟棄先前的信息。

與 MySQL 的 Binlog 不同, Oplog 并不會記錄變更前/后的完整信息。遍歷 Oplog 的確可以捕獲 MongoDB 的數(shù)據(jù)變更,但是想要轉(zhuǎn)換成 Flink 支持的 Changelog 依然存在一些限制。

首先,訂閱 Oplog 難度較大。每個副本集會維護(hù)自己的 Oplog, 對于分片集群來說,每個 Shard 可能是一個獨立的副本集,需要遍歷每個 Shard 的 Oplog 并按照操作時間進(jìn)行排序。另外, Oplog 沒有包含變更文檔前和變更后的完整狀態(tài),因此既不能轉(zhuǎn)換成 Flink 標(biāo)準(zhǔn)的 Changelog ,也不能轉(zhuǎn)換成 Upsert 類型的 Changelog 。這亦是我們在實現(xiàn) MongoDB CDC Connector 的時候沒有采用直接訂閱 Oplog 方案的主要原因。

最終我們選擇使用 MongoDB Change Streams 方案來實現(xiàn) MongoDB CDC Connector。

Change Streams 是 MongoDB 3.6 版本提供的新特性,它提供了更簡單的變更數(shù)據(jù)捕獲接口,屏蔽了直接遍歷 Oplog 的復(fù)雜度。Change Streams 還提供了變更后文檔完整狀態(tài)的提取功能,可以輕松轉(zhuǎn)換成 Flink Upsert 類型的 Changelog。它還提供了比較完整的故障恢復(fù)能力,每一條變更記錄數(shù)據(jù)都會包含一個 resume token 來記錄當(dāng)前變更流的位置。故障發(fā)生后,可以通過 resume token 從當(dāng)前消費點進(jìn)行恢復(fù)。

另外, Change Streams 支持變更事件的篩選和定制化的功能。比如可以將數(shù)據(jù)庫和集合名稱的正則過濾器下推到 MongoDB 來完成,可以明顯減少網(wǎng)絡(luò)開銷。它還提供了對集合庫以及整個集群級別的變更訂閱,能夠支持相應(yīng)的權(quán)限控制。

使用 MongoDB Change Streams 特性實現(xiàn)的 CDC Connector 如上圖所示。首先通過 Change Streams 訂閱 MongoDB 的變更。比如有 insert、update、delete、replace 四種變更類型,先將其轉(zhuǎn)換成 Flink 支持的 upsert  Changelog,便可以在其之上定義成一張動態(tài)表,使用 Flink SQL 進(jìn)行處理。

目前 MongoDB CDC Connector 支持 Exactly-Once 語義,支持全量加增量的訂閱,支持從檢查點、保存點恢復(fù),支持 Snapshot 數(shù)據(jù)的過濾,支持?jǐn)?shù)據(jù)庫的 Database、Collection 等元數(shù)據(jù)的提取,也支持庫集合的正則篩選功能。

02MongoDB CDC Connector 業(yè)務(wù)實踐

XTransfer 成立于 2017 年,聚焦于 B2B 跨境支付業(yè)務(wù),為從事跨境電商出口的中小微企業(yè)提供外貿(mào)收款以及風(fēng)控服務(wù)??缇?B 類業(yè)務(wù)結(jié)算場景涉及的業(yè)務(wù)鏈路很長,從詢盤到最終的成交,過程中涉及物流條款、支付條款等,需要在每個環(huán)節(jié)上做好風(fēng)險管控,以符合跨境資金交易的監(jiān)管要求。

以上種種因素對 XTransfer 的數(shù)據(jù)處理安全性和準(zhǔn)確性都提出了更高的要求。在此基礎(chǔ)上,XTransfer 基于 Flink 搭建了自己的大數(shù)據(jù)平臺,能夠有效保障在跨境 B2B 全鏈路上的數(shù)據(jù)能夠被有效地采集、加工和計算,并滿足了高安全、低延遲、高精度的需求。

變更數(shù)據(jù)采集 CDC 是數(shù)據(jù)集成的關(guān)鍵環(huán)節(jié)。在沒有使用 Flink CDC  之前,一般使用 Debezium、Canal 等傳統(tǒng) CDC 工具來抽取數(shù)據(jù)庫的變更日志,并將其轉(zhuǎn)發(fā)到 Kafka 中,下游讀取 Kafka 中的變更日志進(jìn)行消費。這種架構(gòu)存在以下痛點:

  • 部署組件多,運維成本較高;
  • 下游數(shù)據(jù)消費邏輯需要根據(jù)寫入端進(jìn)行適配,存在一定的開發(fā)成本;
  • 數(shù)據(jù)訂閱配置較復(fù)雜,無法像 Flink CDC 一樣僅通過 SQL 語句便定義出一個完整的數(shù)據(jù)同步邏輯;
  • 難以全部滿足全量 + 增量采集,可能需要引入 DataX 等全量采集組件;
  • 比較偏向于對變更數(shù)據(jù)的采集,對數(shù)據(jù)的處理過濾能力較為薄弱;
  • 難以滿足異構(gòu)數(shù)據(jù)源打?qū)挼膱鼍啊?/li>

目前我們的大數(shù)據(jù)平臺主要使用 Flink CDC 來進(jìn)行變更數(shù)據(jù)捕獲,它具有如下優(yōu)勢:

1. 實時數(shù)據(jù)集成

  • 無須額外部署 Debezium、Canal、Datax 等組件,運維成本大幅降低;
  • 支持豐富的數(shù)據(jù)源,也可復(fù)用 Flink 既有的 connectors 進(jìn)行數(shù)據(jù)采集寫入,可以覆蓋大多數(shù)業(yè)務(wù)場景;
  • 降低了開發(fā)難度,僅通過 Flink SQL 就可以定義出完整的數(shù)據(jù)集成工作流程;
  • 數(shù)據(jù)處理能力較強,依托于 Flink 平臺強大的計算能力可以實現(xiàn)流式 ETL 甚至異構(gòu)數(shù)據(jù)源的 join、group by 等。

2. 構(gòu)建實時數(shù)倉

  • 大幅簡化實時數(shù)倉的部署難度,通過 Flink CDC 實時采集數(shù)據(jù)庫的變更,并寫入 Kafka、Iceberg、Hudi、TiDB 等數(shù)據(jù)庫中,即可使用 Flink 進(jìn)行深度的數(shù)據(jù)挖掘和數(shù)據(jù)處理。
  • Flink 的計算引擎可以支持流批一體的計算模式,不用再維護(hù)多套計算引擎,可以大幅降低數(shù)據(jù)的開發(fā)成本。

3. 實時風(fēng)控

  • 實時風(fēng)控以往一般采取往 Kafka 中發(fā)業(yè)務(wù)事件的方式實現(xiàn),而使用 Flink CDC 之后,可以直接從業(yè)務(wù)庫中捕獲風(fēng)控事件,然后通過 Flink CDC 來進(jìn)行復(fù)雜的事件處理。
  • 可以運行模型,以通過 Flink ML、Alink 來豐富機器學(xué)習(xí)的能力。最后將這些實時風(fēng)控的處置結(jié)果回落進(jìn) Kafka,下達(dá)風(fēng)控指令。

03MongoDB CDC Connector 生產(chǎn)調(diào)優(yōu)

MongoDB CDC Connector 的使用有如下幾點要求:

  • 鑒于使用了 Change Streams 的特性來實現(xiàn) MongoDB CDC Connector, 因此要求 MongoDB 的最小可用版本是 3.6,比較推薦 4.0.8 及以上版本。
  • 必須使用集群部署模式。由于訂閱 MongoDB 的 Change Streams 要求節(jié)點之間能夠進(jìn)行相互復(fù)制數(shù)據(jù),單機 MongoDB 無法進(jìn)行數(shù)據(jù)的互相拷貝,也沒有 Oplog,只有副本集或分片集的情況下才有數(shù)據(jù)復(fù)制機制。
  • 需要使用 WireTiger 存儲引擎,使用 pv1 復(fù)制協(xié)議。
  • 需要擁有 ChangeStream 和 find 用戶權(quán)限。

使用 MongoDB CDC Connector 時要注意設(shè)置 Oplog 的容量和過期時間。MongoDB oplog 是一個特殊的有容量集合,容量達(dá)到最大值后,會丟棄歷史數(shù)據(jù)。而 Change Streams 通過 resume token 來進(jìn)行恢復(fù),太小的 oplog 容量可能會導(dǎo)致 resume token 對應(yīng)的 oplog 記錄不再存在,即 resume token 過期,進(jìn)而導(dǎo)致 Change Streams 無法被恢復(fù)。

可以使用 replSetResizeOplog 設(shè)置 oplog 容量和最短保留時間,MongoDB 4.4 版本之后也支持設(shè)置最小時間。一般而言,生產(chǎn)環(huán)境中建議 oplog 保留不小于 7 天。

對一些變更較慢的表,建議在配置中開啟心跳事件。變更事件和心跳事件可以同時向前推進(jìn) resume token,對于變更較慢的表,可以通過心跳事件來刷新 resume token 避免其過期。

可以通過 heartbeat.interval.ms 設(shè)置心跳的間隔。

由于只能將 MongoDB 的 Change Streams 轉(zhuǎn)換成 Flink 的 Upsert changelog,它類似于 Upsert Kafka 形式,為了補齊 –U 前置鏡像值,會增加一個算子 ChangelogNormalize,而這會帶來額外的狀態(tài)開銷。因此在生產(chǎn)環(huán)境中比較推薦使用 RocksDB State Backend。

當(dāng)默認(rèn)連接的參數(shù)無法滿足使用需求時,可以通過設(shè)置 connection.options 配置項來傳遞 MongoDB 支持的連接參數(shù)。

比如連接 MongoDB 的用戶創(chuàng)建的數(shù)據(jù)庫不在 admin 中,可以設(shè)置參數(shù)來指定需要使用哪個數(shù)據(jù)庫來認(rèn)證當(dāng)前用戶,也可以設(shè)置連接池的最大連接參數(shù)等,MongoDB 的連接字符串默認(rèn)支持這些參數(shù)。

正則匹配多庫、多表是 MongoDB CDC Connector 在 2.0 版本之后提供的新功能。需要注意,如果數(shù)據(jù)庫名稱使用了正則參數(shù),則需要擁有 readAnyDatabase 角色。因為 MongoDB 的 Change Streams 只能在整個集群、數(shù)據(jù)庫以及 collection 粒度上開啟。如果需要對整個數(shù)據(jù)庫進(jìn)行過濾,那么數(shù)據(jù)庫進(jìn)行正則匹配時只能在整個集群上開啟 Change Streams ,然后通過 Pipeline 過濾數(shù)據(jù)庫的變更。可以通過在 Ddatabase 和 Collection 兩個參數(shù)中寫入正則表達(dá)式進(jìn)行多庫、多表的訂閱。

04MongoDB CDC Connector并行化 Snapshot 改進(jìn)

為了加速 Snapshot 的速度,可以使用 Flip-27 引入的 source 來進(jìn)行并行化改造。首先使用一個 split 枚舉器,根據(jù)一定的切分策略,將一個完整的 Snapshot 任務(wù)拆分成若干個子任務(wù),然后分配給多個 split reader 并行做 Snapshot ,以此提升整體任務(wù)的運行速度。

但是在 MongoDB 里,大多情況下組件是 ObjectID,其中前面四個字節(jié)是 UNIX 描述,中間五個字節(jié)是一個隨機值,后面三個字節(jié)是一個自增量。在相同描述里插入的文檔并不是嚴(yán)格遞增的,中間的隨機值可能會影響局部的嚴(yán)格遞增,但從總體來看,依然能夠滿足遞增趨勢。

因此,不同于 MySQL 的遞增組件,MongoDB 并不適合采用 offset + limit 的切分策略對其集合進(jìn)行簡單拆分,需要針對 ObjectID 采用針對性的切分策略。

最終,我們采取了以下三種 MongoDB 切分策略:

  • Sample 采樣分桶:原理是利用 $sample 命令對 collection 進(jìn)行隨機采樣,通過平均文檔大小和每個 chunk 的大小來預(yù)估需要的分桶數(shù)。要求相應(yīng)集合的查詢權(quán)限,其優(yōu)點是速度較快,適用于數(shù)據(jù)量大但是沒有分片的集合;缺點是由于使用了抽樣預(yù)估模式,分桶的結(jié)果不能做到絕對均勻。
  • SplitVector 索引切分:SplitVector 是 MongoDB 計算 chunk 分裂點的內(nèi)部命令,通過訪問指定的索引計算出每個 chunk 的邊界。要求擁有 SplitVector 權(quán)限,其優(yōu)點是速度快,chunk 結(jié)果均勻;缺點是對于數(shù)據(jù)量大且已經(jīng)分片的集合,不如直接讀取 config 庫中已經(jīng)分好的 chunks 元數(shù)據(jù)。
  • Chunks 元數(shù)據(jù)讀?。阂驗?MongoDB 在 config 數(shù)據(jù)庫會存儲分片集合的實際分片結(jié)果,因此可以直接從 config 中讀取分片集合的實際分片結(jié)果。要求擁有 config 庫讀取權(quán)限,僅限于分片集合使用。其優(yōu)點是速度快,無須重新計算 chunk 分裂點,chunk 結(jié)果均勻,默認(rèn)情況下為 64MB;缺點是不能滿足所有場景,僅限分片場景。

上圖為 sample 采樣分桶示例。左側(cè)是一個完整的集合,從完整的集合中設(shè)定樣本數(shù)量,然后將整個樣本縮小,并根據(jù)采樣以后的樣本進(jìn)行分桶,最終結(jié)果就是我們希望的 chunks 邊界。

sample 命令是 MongoDB 采樣的一個內(nèi)置命令。在樣本值小于 5% 的情況下,使用偽隨機算法進(jìn)行采樣;樣本值大于 5% 的情況下,先使用隨機排序,然后選擇前 N 個文檔。它的均勻度和耗時主要取決于隨機算法和樣本的數(shù)量,是一種均勻程度和切分速度的折中策略,適合于要求切分速度快,但可以容忍切分結(jié)果不太均勻的場景。

在實際測試中,sample 采樣的均勻程度有著不錯的表現(xiàn)。

上圖為 SplitVector 索引切分示例。左側(cè)是原始集合,通過 SplitVector 命令指定需要訪問的索引,為 ID 索引??梢栽O(shè)置每個 chunk 的大小,單位為 MB,然后使用 SplitVector 命令訪問索引,并通過索引計算每個塊的邊界。

它速度快,chunk 結(jié)果也很均勻,適用于大部分場景。

上圖為 config.chuncks 讀取示例,即直接讀取 MongoDB 已經(jīng)分好的 chunks 元數(shù)據(jù)。在 Config Server 中會存儲每個 Shard、其所在機器以及每個 Shard 的邊界。對于分片集合,可以直接在 chunks 中讀取它的邊界信息,無須重復(fù)計算這些分裂點,也可以保證每一個 chunk 的讀取在單臺機器上就能完成,速度極快,在大規(guī)模的分片集合場景下有著很好的表現(xiàn)。

05后續(xù)規(guī)劃

Flink CDC 的后續(xù)規(guī)劃主要分為以下五個方面:

  • 第一,協(xié)助完善 Flink CDC 增量 Snapshot 框架;
  • 第二,使用 MongoDB CDC 對接 Flink CDC 增量 Snapshot 框架,使其能夠支持并行 Snapshot 改進(jìn);
  • 第三,MongoDB CDC 支持 Flink RawType。對于一些比較靈活的存儲結(jié)構(gòu)提供 RawType 轉(zhuǎn)換,用戶可以通過 UDF 的形式對其進(jìn)行自定義解析;
  • 第四,MongoDB CDC 支持從指定位置進(jìn)行變更數(shù)據(jù)的采集;
  • 第五,MongoDB CDC 穩(wěn)定性的優(yōu)化。

提問&解答

Q1?:MongoDB CDC 延遲高嗎?是否需要通過犧牲性能來降低延遲?

MongoDB CDC 延遲不高,在全量采集的時候經(jīng)過 changelog normalize 可能會對于 CDC 的增量采集造成一些背壓,但是這種情況可以通過 MongoDB 并行化改造、增加資源的方式來避免。

Q2?:默認(rèn)連接什么時候無法滿足要求?

MongoDB 的用戶可以在任何數(shù)據(jù)庫、任何子庫中進(jìn)行創(chuàng)建。如果不是在 admin 的數(shù)據(jù)庫中創(chuàng)建用戶,認(rèn)證的時候需要顯示地指定要在哪個數(shù)據(jù)庫中認(rèn)證用戶,也可以設(shè)置最大的連接大小等參數(shù)。

Q3?:MongoDB 目前的 DBlog 支持無鎖并發(fā)讀取嗎?

DBlog 的無鎖并發(fā)擁有增量快照的能力,但是因為 MongoDB 難以獲取當(dāng)前 changelog 的位點,所以增量快照無法立刻實現(xiàn),但無鎖并發(fā)的 Snapshot 即將支持。


當(dāng)前標(biāo)題:FlinkCDCMongoDBConnector的實現(xiàn)原理和使用實踐
文章地址:http://www.5511xx.com/article/cosjepo.html