新聞中心
DML:Order By、Limit 子句
大家好,我是老羊,今天我們來(lái)學(xué)習(xí) Flink SQL 中的 TopN、Order By、Limit 3個(gè)操作。

我們提供的服務(wù)有:做網(wǎng)站、成都網(wǎng)站制作、微信公眾號(hào)開(kāi)發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、常山ssl等。為上1000+企事業(yè)單位解決了網(wǎng)站和推廣的問(wèn)題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的常山網(wǎng)站制作公司
1.Order By 子句
支持 Batch\Streaming,但在實(shí)時(shí)任務(wù)中一般用的非常少。
實(shí)時(shí)任務(wù)中,Order By 子句中必須要有時(shí)間屬性字段,并且時(shí)間屬性必須為升序時(shí)間屬性,即 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 或者 WATERMARK FOR rowtime_column AS rowtime_column。
舉例:
CREATE TABLE source_table_1 (
user_id BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10'
);
CREATE TABLE sink_table (
user_id BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Order By row_time, user_id desc
2.Limit 子句
支持 Batch\Streaming,但實(shí)時(shí)場(chǎng)景一般不使用,但是此處依然舉一個(gè)例子:
CREATE TABLE source_table_1 (
user_id BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10'
);
CREATE TABLE sink_table (
user_id BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Limit 3
結(jié)果如下,只有 3 條輸出:
+I[5]
+I[9]
+I[4]
DML:TopN 子句
- TopN 定義(支持 Batch\Streaming):TopN 其實(shí)就是對(duì)應(yīng)到離線數(shù)倉(cāng)中的 row_number(),可以使用 row_number() 對(duì)某一個(gè)分組的數(shù)據(jù)進(jìn)行排序
- 應(yīng)用場(chǎng)景:根據(jù) 某個(gè)排序 條件,計(jì)算某個(gè)分組下的排行榜數(shù)據(jù)
- SQL 語(yǔ)法標(biāo)準(zhǔn):
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
ROW_NUMBER():標(biāo)識(shí) TopN 排序子句
PARTITION BY col1[, col2...]:標(biāo)識(shí)分區(qū)字段,代表按照這個(gè) col 字段作為分區(qū)粒度對(duì)數(shù)據(jù)進(jìn)行排序取 topN,比如下述案例中的partition by key,就是根據(jù)需求中的搜索關(guān)鍵詞(key)做為分區(qū)
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:標(biāo)識(shí) TopN 的排序規(guī)則,是按照哪些字段、順序或逆序進(jìn)行排序
WHERE rownum <= N:這個(gè)子句是一定需要的,只有加上了這個(gè)子句,F(xiàn)link 才能將其識(shí)別為一個(gè) TopN 的查詢,其中 N 代表 TopN 的條目數(shù)
[AND conditions]:其他的限制條件也可以加上
- 實(shí)際案例:取某個(gè)搜索關(guān)鍵詞下的搜索熱度前 10 名的詞條數(shù)據(jù)。
輸入數(shù)據(jù)為搜索詞條數(shù)據(jù)的搜索熱度數(shù)據(jù),當(dāng)搜索熱度發(fā)生變化時(shí),會(huì)將變化后的數(shù)據(jù)寫(xiě)入到數(shù)據(jù)源的 Kafka 中:
數(shù)據(jù)源 schema:
-- 字段名 備注
-- key 搜索關(guān)鍵詞
-- name 搜索熱度名稱(chēng)
-- search_cnt 熱搜消費(fèi)熱度(比如 3000)
-- timestamp 消費(fèi)詞條時(shí)間戳
CREATE TABLE source_table (
name BIGINT NOT NULL,
search_cnt BIGINT NOT NULL,
key BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
...
);
-- 數(shù)據(jù)匯 schema:
-- key 搜索關(guān)鍵詞
-- name 搜索熱度名稱(chēng)
-- search_cnt 熱搜消費(fèi)熱度(比如 3000)
-- timestamp 消費(fèi)詞條時(shí)間戳
CREATE TABLE sink_table (
key BIGINT,
name BIGINT,
search_cnt BIGINT,
`timestamp` TIMESTAMP(3)
) WITH (
...
);
-- DML 邏輯
INSERT INTO sink_table
SELECT key, name, search_cnt, row_time as `timestamp`
FROM (
SELECT key, name, search_cnt, row_time,
-- 根據(jù)熱搜關(guān)鍵詞 key 作為 partition key,然后按照 search_cnt 倒排取前 100 名
ROW_NUMBER() OVER (PARTITION BY key
ORDER BY search_cnt desc) AS rownum
FROM source_table)
WHERE rownum <= 100
輸出結(jié)果:
-D[關(guān)鍵詞1, 詞條1, 4944]
+I[關(guān)鍵詞1, 詞條1, 8670]
+I[關(guān)鍵詞1, 詞條2, 1735]
-D[關(guān)鍵詞1, 詞條3, 6641]
+I[關(guān)鍵詞1, 詞條3, 6928]
-D[關(guān)鍵詞1, 詞條4, 6312]
+I[關(guān)鍵詞1, 詞條4, 7287]
可以看到輸出數(shù)據(jù)是有回撤數(shù)據(jù)的,為什么會(huì)出現(xiàn)回撤,我們來(lái)看看 SQL 語(yǔ)義。
- SQL 語(yǔ)義
上面的 SQL 會(huì)翻譯成以下三個(gè)算子:
數(shù)據(jù)源:數(shù)據(jù)源即最新的詞條下面的搜索詞的搜索熱度數(shù)據(jù),消費(fèi)到 Kafka 中數(shù)據(jù)后,按照 partition key 將數(shù)據(jù)進(jìn)行 hash 分發(fā)到下游排序算子,相同的 key 數(shù)據(jù)將會(huì)發(fā)送到一個(gè)并發(fā)中
排序算子:為每個(gè) Key 維護(hù)了一個(gè) TopN 的榜單數(shù)據(jù),接受到上游的一條數(shù)據(jù)后,如果 TopN 榜單還沒(méi)有到達(dá) N 條,則將這條數(shù)據(jù)加入 TopN 榜單后,直接下發(fā)數(shù)據(jù),如果到達(dá) N 條之后,經(jīng)過(guò) TopN 計(jì)算,發(fā)現(xiàn)這條數(shù)據(jù)比原有的數(shù)據(jù)排序靠前,那么新的 TopN 排名就會(huì)有變化,就變化了的這部分?jǐn)?shù)據(jù)之前下發(fā)的排名數(shù)據(jù)撤回(即回撤數(shù)據(jù)),然后下發(fā)新的排名數(shù)據(jù)
數(shù)據(jù)匯:接收到上游的數(shù)據(jù)之后,然后輸出到外部存儲(chǔ)引擎中
上面三個(gè)算子也是會(huì) 24 小時(shí)一直運(yùn)行的。
本文名稱(chēng):FlinkSQL知其所以然:TopN、OrderBy、Limit操作
路徑分享:http://www.5511xx.com/article/djjecjg.html


咨詢
建站咨詢
