日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
聊聊FlinkSQL增量查詢(xún)Hudi表

?官網(wǎng)文檔

地址:https://hudi.apache.org/cn/docs/querying_data#incremental-query

成都創(chuàng)新互聯(lián)主營(yíng)皮山網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,App定制開(kāi)發(fā),皮山h5重慶小程序開(kāi)發(fā)公司搭建,皮山網(wǎng)站營(yíng)銷(xiāo)推廣歡迎皮山等地區(qū)企業(yè)咨詢(xún)

參數(shù)

  • read.start-commit 增量查詢(xún)開(kāi)始時(shí)間 對(duì)于流讀,如果不指定該值,默認(rèn)取最新的instantTime,也就是流讀默認(rèn)從最新的instantTime開(kāi)始讀(包含最新的)。對(duì)于批讀,如果不指定該參數(shù),只指定read.end-commit,則實(shí)現(xiàn)時(shí)間旅行的功能,可查詢(xún)歷史記錄
  • read.end-commit 增量查詢(xún)結(jié)束時(shí)間 不指定該參數(shù)則默認(rèn)讀取到最新的記錄,該參數(shù)一般只適用于批讀,因?yàn)榱髯x一般的需求是查詢(xún)所有的增量數(shù)據(jù)
  • read.streaming.enabled 是否流讀 默認(rèn)false
  • read.streaming.check-interval  流讀的檢查時(shí)間間隔,單位秒(s),默認(rèn)值60,也就是一分鐘查詢(xún)范圍 [BEGIN_INSTANTTIME,END_INSTANTTIME],既包含開(kāi)始時(shí)間又包含結(jié)束時(shí)間,對(duì)于默認(rèn)值可參考上面的參數(shù)說(shuō)明

版本

建表造數(shù):

  • Hudi 0.9.0
  • Spark 2.4.5

我這里建表造數(shù)使用Hudi Spark SQL 0.9.0,目的是為了模擬項(xiàng)目上用Java Client和Spark SQL創(chuàng)建的Hudi表,以驗(yàn)證Hudi Flink SQL增量查詢(xún)時(shí)是否兼容舊版本的Hudi表(大家沒(méi)有這種需求的,可以使用任何方式正常造數(shù))

查詢(xún)

  • Hudi 0.13.0-SNAPSHOT
  • Flink 1.14.3 (增量查詢(xún))
  • Spark 3.1.2 (主要是為了使用Call Procedures命令查看commit信息)

建表造數(shù)

-- Spark SQL Hudi 0.9.0
create table hudi.test_flink_incremental (
id int,
name string,
price double,
ts long,
dt string
) using hudi
partitioned by (dt)
options (
primaryKey = 'id',
preCombineField = 'ts',
type = 'cow'
);

insert into hudi.test_flink_incremental values (1,'a1', 10, 1000, '2022-11-25');
insert into hudi.test_flink_incremental values (2,'a2', 20, 2000, '2022-11-25');
update hudi.test_flink_incremental set name='hudi2_update' where id = 2;
insert into hudi.test_flink_incremental values (3,'a3', 30, 3000, '2022-11-26');
insert into hudi.test_flink_incremental values (4,'a4', 40, 4000, '2022-12-26');

用show_commits看一下有哪些commits(這里查詢(xún)用的是Hudi的master,因?yàn)閟how_commits是在0.11.0版本開(kāi)始支持的,也可以通過(guò)使用hadoop命令查看.hoodie文件夾下的.commit文件)

call show_commits(table => 'hudi.test_flink_incremental');
20221205152736
20221205152723
20221205152712
20221205152702
20221205152650

Flink SQL創(chuàng)建Hudi內(nèi)存表

CREATE TABLE test_flink_incremental (
id int PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
price double,
ts bigint,
dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental'
);

建表時(shí)不指定增量查詢(xún)相關(guān)的參數(shù),我們?cè)诓樵?xún)時(shí)動(dòng)態(tài)指定,這樣比較靈活。動(dòng)態(tài)指定參數(shù)方法,在查詢(xún)語(yǔ)句后面加上如下形式的語(yǔ)句

/*+ 
options(
'read.start-commit' = '20221205152723',
'read.end-commit'='20221205152736'
)
*/

批讀

Flink SQL讀Hudi有兩種模式:批讀和流讀。默認(rèn)批讀,先看一下批讀的增量查詢(xún)

驗(yàn)證是否包含起始時(shí)間和默認(rèn)結(jié)束時(shí)間

select * from test_flink_incremental 
/*+
options(
'read.start-commit' = '20221205152723' --起始時(shí)間對(duì)應(yīng)id=3的記錄
)
*/

結(jié)果包含起始時(shí)間,不指定結(jié)束時(shí)間默認(rèn)讀到最新的數(shù)據(jù)

id   name     price        ts                 dt
4 a4 40.0 4000 dt=2022-12-26
3 a3 30.0 3000 dt=2022-11-26

驗(yàn)證是否包含結(jié)束時(shí)間

select * from test_flink_incremental 
/*+
options(
'read.start-commit' = '20221205152712', --起始時(shí)間對(duì)應(yīng)id=2的記錄
'read.end-commit'='20221205152723' --結(jié)束時(shí)間對(duì)應(yīng)id=3的記錄
)
*/

結(jié)果包含結(jié)束時(shí)間

id           name        price       ts                 dt
3 a3 30.0 3000 dt=2022-11-26
2 hudi2_update 20.0 2000 dt=2022-11-25

驗(yàn)證默認(rèn)開(kāi)始時(shí)間

這種情況是指定結(jié)束時(shí)間,但不指定開(kāi)始時(shí)間,如果都不指定,則讀表所有的最新版本的記錄。

select * from test_flink_incremental 
/*+
options(
'read.end-commit'='20221205152712' --結(jié)束時(shí)間對(duì)應(yīng)id=2的更新記錄
)
*/

結(jié)果:只查詢(xún)end-commit對(duì)應(yīng)的記錄

id           name        price       ts                 dt
2 hudi2_update 20.0 2000 dt=2022-11-25

時(shí)間旅行(查詢(xún)歷史記錄)

驗(yàn)證是否可以查詢(xún)歷史記錄,我們更新id為2的name,更新前name為a2,更新后為hudi2_update,我們驗(yàn)證一下,是否可以通過(guò)FlinkSQL查詢(xún)Hudi歷史記錄,預(yù)期結(jié)果查出id=2,name=a2

select * from test_flink_incremental 
/*+
options(
'read.end-commit'='20221205152702' --結(jié)束時(shí)間對(duì)應(yīng)id=2的歷史記錄
)
*/

結(jié)果:可以正確查詢(xún)歷史記錄

id           name        price       ts                 dt
2 a2 20.0 2000 dt=2022-11-25

流讀

開(kāi)啟流讀的參數(shù):

read.streaming.enabled = true

流讀不需要設(shè)置結(jié)束時(shí)間,因?yàn)橐话愕男枨笫亲x所有的增量數(shù)據(jù),我們只需要驗(yàn)證開(kāi)始時(shí)間就好了

驗(yàn)證默認(rèn)開(kāi)始時(shí)間

select * from test_flink_incremental 
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4'
)
*/

結(jié)果:從最新的instantTime開(kāi)始增量讀取,也就是默認(rèn)的read.start-commit為最新的instantTime

id   name     price        ts                 dt
4 a4 40.0 4000 dt=2022-12-26

驗(yàn)證指定開(kāi)始時(shí)間

select * from test_flink_incremental 
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20221205152712'
)
*/

結(jié)果:

id           name        price       ts                 dt
2 hudi2_update 20.0 2000 dt=2022-11-25
3 a3 30.0 3000 dt=2022-11-26
4 a4 40.0 4000 dt=2022-11-26

如果想第一次查詢(xún)?nèi)康臍v史數(shù)據(jù),可以將start-commit設(shè)置的早一點(diǎn),比如設(shè)置到去年:'read.start-commit' = '20211205152712'

select * from test_flink_incremental 
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20211205152712'
)
*/
id           name        price       ts                 dt
1 a1 10.0 1000 dt=2022-11-25
2 hudi2_update 20.0 2000 dt=2022-11-25
3 a3 30.0 3000 dt=2022-11-26
4 a4 40.0 4000 dt=2022-11-26

驗(yàn)證流讀的連續(xù)性

驗(yàn)證新的增量數(shù)據(jù)進(jìn)來(lái),是否可以持續(xù)消費(fèi)Hudi增量數(shù)據(jù),驗(yàn)證數(shù)據(jù)的準(zhǔn)確一致性,為了方便驗(yàn)證,我可以使用Flink SQL增量流讀Hudi表然后Sink到MySQL表中,最后通過(guò)讀取MySQL表中的數(shù)據(jù)驗(yàn)證數(shù)據(jù)的準(zhǔn)確性

Flink SQL讀寫(xiě)MySQL需要配置jar包,將flink-connector-jdbc_2.12-1.14.3.jar?放到lib?下即可,下載地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.14.3/flink-connector-jdbc_2.12-1.14.3.jar

先在MySQL中創(chuàng)建一張Sink表

-- MySQL
CREATE TABLE `test_sink` (
`id` int(11),
`name` text DEFAULT NULL,
`price` int(11),
`ts` int(11),
`dt` text DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Flink中創(chuàng)建對(duì)應(yīng)的sink表

create table test_sink (
id int,
name string,
price double,
ts bigint,
dt string
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
'username' = 'root',
'password' = 'root-123',
'table-name' = 'test_sink',
'sink.buffer-flush.max-rows' = '1'
);

然后流式增量讀取Hudi表Sink Mysql

insert into test_sink
select * from test_flink_incremental
/*+
options(
'read.streaming.enabled'='true',
'read.streaming.check-interval' = '4',
'read.start-commit' = '20221205152712'
)
*/

這樣會(huì)起一個(gè)長(zhǎng)任務(wù),一直處于running狀態(tài),我們可以在yarn-session界面上驗(yàn)證這一點(diǎn)

然后先在MySQL中驗(yàn)證一下歷史數(shù)據(jù)的準(zhǔn)確性

再利用Spark SQL往source表插入兩條數(shù)據(jù)

-- Spark SQL
insert into hudi.test_flink_incremental values (5,'a5', 50, 5000, '2022-12-07');
insert into hudi.test_flink_incremental values (6,'a6', 60, 6000, '2022-12-07');

我們?cè)隽孔x取的間隔設(shè)置的4s,成功插入數(shù)據(jù)等待4s后,再在MySQL表中驗(yàn)證一下數(shù)據(jù)

發(fā)現(xiàn)新增的數(shù)據(jù)已經(jīng)成功Sink到MySQL中了,并且數(shù)據(jù)沒(méi)有重復(fù)

最后驗(yàn)證一下更新的增量數(shù)據(jù),Spark SQL更新Hudi source表

-- Spark SQL
update hudi.test_flink_incremental set name='hudi5_update' where id = 5;

繼續(xù)驗(yàn)證結(jié)果

結(jié)果是更新的增量數(shù)據(jù)也會(huì)insert到MySQL中的sink表,但是不會(huì)更新原來(lái)的數(shù)據(jù)

那如果想實(shí)現(xiàn)更新的效果呢?我們需要在MySQL和Flink的sink表中加上主鍵字段,兩者缺一不可,如下:

-- MySQL
CREATE TABLE `test_sink` (
`id` int(11),
`name` text DEFAULT NULL,
`price` int(11),
`ts` int(11),
`dt` text DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- Flink SQL
create table test_sink (
id int PRIMARY KEY NOT ENFORCED,
name string,
price double,
ts bigint,
dt string
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8',
'username' = 'root',
'password' = 'root-123',
'table-name' = 'test_sink',
'sink.buffer-flush.max-rows' = '1'
);

將剛才起的長(zhǎng)任務(wù)關(guān)掉,重新執(zhí)行剛才的insert語(yǔ)句,先跑一下歷史數(shù)據(jù),最后再驗(yàn)證一下增量效果

-- Spark SQL
update hudi.test_flink_incremental set name='hudi6_update' where id = 6;
insert into hudi.test_flink_incremental values (7,'a7', 70, 7000, '2022-12-07');

可以看到,達(dá)到了預(yù)期效果,對(duì)于id=6的執(zhí)行更新操作,對(duì)于id=7的執(zhí)行插入操作。


當(dāng)前名稱(chēng):聊聊FlinkSQL增量查詢(xún)Hudi表
URL鏈接:http://www.5511xx.com/article/dhihhis.html