日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線(xiàn)溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
Flink SQL 知其所以然:Over 聚合操作

架構(gòu)

大家好,我是老羊,今天我們來(lái)學(xué)習(xí) Flink SQL 中的· Over 聚合操作。

  • Over 聚合定義(支持 Batch\Streaming):可以理解為是一種特殊的滑動(dòng)窗口聚合函數(shù)。

那這里我們拿 Over 聚合? 與 窗口聚合 做一個(gè)對(duì)比,其之間的最大不同之處在于:

窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到

Over 聚合:能夠保留原始字段

注意:其實(shí)在生產(chǎn)環(huán)境中,Over 聚合的使用場(chǎng)景還是比較少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在離線(xiàn)數(shù)倉(cāng)經(jīng)常使用嘛?

  • 應(yīng)用場(chǎng)景:計(jì)算最近一段滑動(dòng)窗口的聚合結(jié)果數(shù)據(jù)。
  • 際案例:查詢(xún)每個(gè)產(chǎn)品最近一小時(shí)訂單的金額總和:

SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders

Over 聚合的語(yǔ)法總結(jié)如下:

SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...

其中:

  • ORDER BY:必須是時(shí)間戳列(事件時(shí)間、處理時(shí)間)
  • PARTITION BY:標(biāo)識(shí)了聚合窗口的聚合粒度,如上述案例是按照 product 進(jìn)行聚合
  • range_definition:這個(gè)標(biāo)識(shí)聚合窗口的聚合數(shù)據(jù)范圍,在 Flink 中有兩種指定數(shù)據(jù)范圍的方式。第一種為按照行數(shù)聚合?,第二種為按照時(shí)間區(qū)間聚合。如下案例所示:

a. 時(shí)間區(qū)間聚合:

按照時(shí)間區(qū)間聚合就是時(shí)間區(qū)間的一個(gè)滑動(dòng)窗口,比如下面案例 1 小時(shí)的區(qū)間,最新輸出的一條數(shù)據(jù)的 sum 聚合結(jié)果就是最近一小時(shí)數(shù)據(jù)的 amount 之和。

CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '10',
'fields.product.min' = '1',
'fields.product.max' = '2'
);

CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector' = 'print'
);

INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 標(biāo)識(shí)統(tǒng)計(jì)范圍是一個(gè) product 的最近 1 小時(shí)的數(shù)據(jù)
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table

結(jié)果如下:

+I[2, 2021-12-24T22:08:26.583, 7, 73]
+I[2, 2021-12-24T22:08:27.583, 7, 80]
+I[2, 2021-12-24T22:08:28.583, 4, 84]
+I[2, 2021-12-24T22:08:29.584, 7, 91]
+I[2, 2021-12-24T22:08:30.583, 8, 99]
+I[1, 2021-12-24T22:08:31.583, 9, 138]
+I[2, 2021-12-24T22:08:32.584, 6, 105]
+I[1, 2021-12-24T22:08:33.584, 7, 145]

b.  行數(shù)聚合:

按照行數(shù)聚合就是數(shù)據(jù)行數(shù)的一個(gè)滑動(dòng)窗口,比如下面案例,最新輸出的一條數(shù)據(jù)的 sum 聚合結(jié)果就是最近 5 行數(shù)據(jù)的 amount 之和。

CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '2',
'fields.product.min' = '1',
'fields.product.max' = '2'
);

CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector' = 'print'
);

INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 標(biāo)識(shí)統(tǒng)計(jì)范圍是一個(gè) product 的最近 5 行數(shù)據(jù)
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table

預(yù)跑結(jié)果如下:

+I[2, 2021-12-24T22:18:19.147, 1, 9]
+I[1, 2021-12-24T22:18:20.147, 2, 11]
+I[1, 2021-12-24T22:18:21.147, 2, 12]
+I[1, 2021-12-24T22:18:22.147, 2, 12]
+I[1, 2021-12-24T22:18:23.148, 2, 12]
+I[1, 2021-12-24T22:18:24.147, 1, 11]
+I[1, 2021-12-24T22:18:25.146, 1, 10]
+I[1, 2021-12-24T22:18:26.147, 1, 9]
+I[2, 2021-12-24T22:18:27.145, 2, 11]
+I[2, 2021-12-24T22:18:28.148, 1, 10]
+I[2, 2021-12-24T22:18:29.145, 2, 10]

當(dāng)然,如果你在一個(gè) SELECT 中有多個(gè)聚合窗口的聚合方式,F(xiàn)link SQL 支持了一種簡(jiǎn)化寫(xiě)法,如下案例:

SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使用下面子句,定義 Over Window
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)


網(wǎng)站欄目:Flink SQL 知其所以然:Over 聚合操作
本文網(wǎng)址:http://www.5511xx.com/article/ccoioog.html