Flink SQL 知其所以然:Window TopN 操作
作者:antigeneral了呀
今天我們來學(xué)習(xí) Flink SQL 中的 Window TopN 操作。
大家好,我是老羊,今天我們來學(xué)習(xí) Flink SQL 中的 Window TopN 操作。
- Window TopN 定義(支持 Streaming):Window TopN 是一種特殊的 TopN,它的返回結(jié)果是每一個(gè)窗口內(nèi)的 N 個(gè)最小值或者最大值。
- 應(yīng)用場景:小伙伴萌會問了,我有了 TopN 為啥還需要 Window TopN 呢?還記得上文介紹 TopN 說道的 TopN 時(shí)會出現(xiàn)中間結(jié)果,從而出現(xiàn)回撤數(shù)據(jù)的嘛?Window TopN 不會出現(xiàn)回撤數(shù)據(jù),因?yàn)?Window TopN 實(shí)現(xiàn)是在窗口結(jié)束時(shí)輸出最終結(jié)果,不會產(chǎn)生中間結(jié)果。而且注意,因?yàn)槭谴翱谏厦娴牟僮?,Window TopN 在窗口結(jié)束時(shí),會自動把 State 給清除。
- SQL 語法標(biāo)準(zhǔn):
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name) -- windowing TVF
WHERE rownum <= N [AND conditions]
- 實(shí)際案例:取當(dāng)前這一分鐘的搜索關(guān)鍵詞下的搜索熱度前 10 名的詞條數(shù)據(jù)。
輸入表字段:
-- 字段名 備注
-- key 搜索關(guān)鍵詞
-- name 搜索熱度名稱
-- search_cnt 熱搜消費(fèi)熱度(比如 3000)
-- timestamp 消費(fèi)詞條時(shí)間戳
CREATE TABLE source_table (
name BIGINT NOT NULL,
search_cnt BIGINT NOT NULL,
key BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
...
);
-- 輸出表字段:
-- 字段名 備注
-- key 搜索關(guān)鍵詞
-- name 搜索熱度名稱
-- search_cnt 熱搜消費(fèi)熱度(比如 3000)
-- window_start 窗口開始時(shí)間戳
-- window_end 窗口結(jié)束時(shí)間戳
CREATE TABLE sink_table (
key BIGINT,
name BIGINT,
search_cnt BIGINT,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
...
);
-- 處理 sql:
INSERT INTO sink_table
SELECT key, name, search_cnt, window_start, window_end
FROM (
SELECT key, name, search_cnt, window_start, window_end,
ROW_NUMBER() OVER (PARTITION BY window_start, window_end, key
ORDER BY search_cnt desc) AS rownum
FROM (
SELECT window_start, window_end, key, name, max(search_cnt) as search_cnt
-- window tvf 寫法
FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '1' MINUTES))
GROUP BY window_start, window_end, key, name
)
)
WHERE rownum <= 100
輸出結(jié)果:
+I[關(guān)鍵詞1, 詞條1, 8670, 2021-1-28T22:34, 2021-1-28T22:35]
+I[關(guān)鍵詞1, 詞條2, 6928, 2021-1-28T22:34, 2021-1-28T22:35]
+I[關(guān)鍵詞1, 詞條3, 1735, 2021-1-28T22:34, 2021-1-28T22:35]
+I[關(guān)鍵詞1, 詞條4, 7287, 2021-1-28T22:34, 2021-1-28T22:35]
...
可以看到結(jié)果是符合預(yù)期的,其中沒有回撤數(shù)據(jù)。
- SQL 語義。
- 數(shù)據(jù)源:數(shù)據(jù)源即最新的詞條下面的搜索詞的搜索熱度數(shù)據(jù),消費(fèi)到 Kafka 中數(shù)據(jù)后,將數(shù)據(jù)按照窗口聚合的 key 通過 hash 分發(fā)策略發(fā)送到下游窗口聚合算子。
- 窗口聚合算子:進(jìn)行窗口聚合計(jì)算,隨著時(shí)間的推進(jìn),將窗口聚合結(jié)果計(jì)算完成發(fā)往下游窗口排序算子。
- 窗口排序算子:這個(gè)算子其實(shí)也是一個(gè)窗口算子,只不過這個(gè)窗口算子為每個(gè) Key 維護(hù)了一個(gè) TopN 的榜單數(shù)據(jù),接受到上游發(fā)送的窗口結(jié)果數(shù)據(jù)進(jìn)行排序,隨著時(shí)間的推進(jìn),窗口的結(jié)束,將排序的結(jié)果輸出到下游數(shù)據(jù)匯算子。
- 數(shù)據(jù)匯:接收到上游的數(shù)據(jù)之后,然后輸出到外部存儲引擎中。
責(zé)任編輯:姜華
來源:
大數(shù)據(jù)羊說