成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink SQL 知其所以然:Group 聚合操作

數(shù)據(jù)庫 其他數(shù)據(jù)庫
架構(gòu)

Group 聚合

  • Group 聚合定義(支持 Batch\Streaming 任務(wù)):Flink 也支持 Group 聚合。Group 聚合和上面介紹到的窗口聚合的不同之處,就在于 Group 聚合是按照數(shù)據(jù)的類別進(jìn)行分組,比如年齡、性別,是橫向的;而窗口聚合是在時間粒度上對數(shù)據(jù)進(jìn)行分組,是縱向的。如下圖所示,就展示出了其區(qū)別。其中按顏色分 key(橫向)? 就是 Group 聚合,按窗口劃分(縱向) 就是窗口聚合。

圖片

tumble window + key

  • 應(yīng)用場景:一般用于對數(shù)據(jù)進(jìn)行分組,然后后續(xù)使用聚合函數(shù)進(jìn)行 count、sum 等聚合操作。

那么這時候,小伙伴萌就會問到,我其實可以把窗口聚合的寫法也轉(zhuǎn)換為 Group 聚合,只需要把 Group 聚合的 Group By key 換成時間就行,那這兩個聚合的區(qū)別到底在哪?

首先來舉一個例子看看怎么將窗口聚合轉(zhuǎn)換為 Group 聚合。假如一個窗口聚合是按照 1 分鐘的粒度進(jìn)行聚合,如下 SQL:

-- 數(shù)據(jù)源表
CREATE TABLE source_table (
-- 維度數(shù)據(jù)
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設(shè)置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)

-- 數(shù)據(jù)匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)

-- 數(shù)據(jù)處理邏輯
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計算 uv 數(shù)
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
-- 按照 Flink SQL tumble 窗口寫法劃分窗口
tumble(row_time, interval '1' minute)

轉(zhuǎn)換為 Group 聚合的寫法如下:

Group 聚合

-- 數(shù)據(jù)源表
CREATE TABLE source_table (
-- 維度數(shù)據(jù)
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設(shè)置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);

-- 數(shù)據(jù)匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);

-- 數(shù)據(jù)處理邏輯
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計算 uv 數(shù)
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
dim,
-- 將秒級別時間戳 / 60 轉(zhuǎn)化為 1min
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)

確實沒錯,上面這個轉(zhuǎn)換是一點問題都沒有的。

但是窗口聚合和 Group by 聚合的差異在于:

本質(zhì)區(qū)別:窗口聚合是具有時間語義的,其本質(zhì)是想實現(xiàn)窗口結(jié)束輸出結(jié)果之后,后續(xù)有遲到的數(shù)據(jù)也不會對原有的結(jié)果發(fā)生更改了,即輸出結(jié)果值是定值(不考慮 allowLateness)。而 Group by 聚合是沒有時間語義的,不管數(shù)據(jù)遲到多長時間,只要數(shù)據(jù)來了,就把上一次的輸出的結(jié)果數(shù)據(jù)撤回,然后把計算好的新的結(jié)果數(shù)據(jù)發(fā)出。

運行層面:窗口聚合是和 時間 綁定的,窗口聚合其中窗口的計算結(jié)果觸發(fā)都是由時間(Watermark)推動的。Group by 聚合完全由數(shù)據(jù)推動觸發(fā)計算,新來一條數(shù)據(jù)去根據(jù)這條數(shù)據(jù)進(jìn)行計算出結(jié)果發(fā)出;由此可見兩者的實現(xiàn)方式也大為不同。

  • SQL 語義

也是拿離線和實時做對比,Orders 為 kafka,target_table 為 Kafka,這個 SQL 生成的實時任務(wù),在執(zhí)行時,會生成三個算子:

數(shù)據(jù)源算子?(From Order):數(shù)據(jù)源算子一直運行,實時的從 Order Kafka 中一條一條的讀取數(shù)據(jù),然后一條一條發(fā)送給下游的Group 聚合算子,向下游發(fā)送數(shù)據(jù)的 shuffle 策略是根據(jù) group by 中的 key 進(jìn)行發(fā)送,相同的 key 發(fā)到同一個 SubTask(并發(fā)) 中。

Group 聚合算子?(group by key + sum\count\max\min):接收到上游算子發(fā)的一條一條的數(shù)據(jù),去狀態(tài) state 中找這個 key 之前的 sum\count\max\min 結(jié)果。如果有結(jié)果oldResult?,拿出來和當(dāng)前的數(shù)據(jù)進(jìn)行sum\count\max\min? 計算出這個 key 的新結(jié)果newResult?,并將新結(jié)果[key, newResult]? 更新到 state 中,在向下游發(fā)送新計算的結(jié)果之前,先發(fā)一條撤回上次結(jié)果的消息-[key, oldResult]?,然后再將新結(jié)果發(fā)往下游+[key, newResult]?;如果 state 中沒有當(dāng)前 key 的結(jié)果,則直接使用當(dāng)前這條數(shù)據(jù)計算 sum\max\min 結(jié)果newResult?,并將新結(jié)果[key, newResult]? 更新到 state 中,當(dāng)前是第一次往下游發(fā),則不需要先發(fā)回撤消息,直接發(fā)送+[key, newResult]。

數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫入到 target_table Kafka 中。

這個實時任務(wù)也是 24 小時一直在運行的,所有的算子在同一時刻都是處于 running 狀態(tài)的。

特別注意:

  • Group by 聚合涉及到了回撤流(也叫 retract 流),會產(chǎn)生回撤流是因為從整個 SQL 的語義來看,上游的 Kafk數(shù)據(jù)是源源不斷的,無窮無盡的,那么每次這個 SQL 任務(wù)產(chǎn)出的結(jié)果都是一個中間結(jié)果,所以每次結(jié)果發(fā)生更新時,都需要將上一次發(fā)出的中間結(jié)果給撤回,然后將最新的結(jié)果發(fā)下去。
  • Group by 聚合涉及到了狀態(tài):狀態(tài)大小也取決于不同 key 的數(shù)量。為了防止?fàn)顟B(tài)無限變大,我們可以設(shè)置狀態(tài)的 TTL。以上面的 SQL 為例,上面 SQL 是按照分鐘進(jìn)行聚合的,理論上到了今天,通常我們就可以不用關(guān)心昨天的數(shù)據(jù)了,那么我們可以設(shè)置狀態(tài)過期時間為一天。關(guān)于狀態(tài)過期時間的設(shè)置參數(shù)可以參考下文運行時參數(shù) 小節(jié)。

如果這個 SQL 放在 Hive 中執(zhí)行時,其中 Orders 為 Hive,target_table 也為 Hive,其也會生成三個相同的算子,但是其和實時任務(wù)的執(zhí)行方式完全不同:

  • 數(shù)據(jù)源算子?(From Order):數(shù)據(jù)源算子從 Order Hive 中讀取到所有的數(shù)據(jù),然后所有數(shù)據(jù)發(fā)送給下游的Group 聚合算子,向下游發(fā)送數(shù)據(jù)的 shuffle 策略是根據(jù) group by 中的 key 進(jìn)行發(fā)送,相同的 key 發(fā)到同一個算子中,然后這個算子就運行結(jié)束了,釋放資源了。
  • Group 聚合算子?(group by + sum\count\max\min):接收到上游算子發(fā)的所有數(shù)據(jù),然后遍歷計算 sum\count\max\min 結(jié)果,批量發(fā)給下游數(shù)據(jù)匯算子,這個算子也就運行結(jié)束了,釋放資源了。
  • 數(shù)據(jù)匯算子(INSERT INTO target_table):接收到上游發(fā)的一條一條的數(shù)據(jù),寫入到 target_table Hive 中,整個任務(wù)也就運行結(jié)束了,整個任務(wù)的資源也就都釋放了。

Group 聚合支持 Grouping sets、Rollup、Cube

Group 聚合也支持 Grouping sets、Rollup、Cube。

舉一個 Grouping sets 的案例:

SELECT 
supplier_id
, rating
, product_id
, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)?

責(zé)任編輯:武曉燕 來源: 大數(shù)據(jù)羊說
相關(guān)推薦

2022-06-10 09:01:04

OverFlinkSQL

2022-07-05 09:03:05

Flink SQLTopN

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-06-29 09:01:38

FlinkSQL時間屬性

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-05-15 09:57:59

Flink SQL時間語義

2022-05-27 09:02:58

SQLHive語義

2021-12-09 06:59:24

FlinkSQL 開發(fā)

2022-06-18 09:26:00

Flink SQLJoin 操作

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類型

2021-11-28 11:36:08

SQL Flink Join

2022-08-10 10:05:29

FlinkSQL

2021-11-27 09:03:26

flink join數(shù)倉

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2022-07-12 09:02:18

Flink SQL去重

2021-12-06 07:15:47

開發(fā)Flink SQL

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2021-11-24 08:17:21

Flink SQLCumulate WiSQL

2018-08-27 06:30:49

InnoDBMySQLMyISAM
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 成人二区三区 | 剑来高清在线观看 | 搞黄网站在线观看 | 成人免费视频网站在线观看 | 美女天堂在线 | 在线91| 91在线综合 | 精品国产一区二区三区免费 | av中文在线观看 | 欧美极品在线观看 | 99久久久无码国产精品 | 男女羞羞视频网站 | 日韩精品免费在线观看 | 欧洲精品码一区二区三区免费看 | 欧美精品成人一区二区三区四区 | 久久成人免费 | 伊人青青久久 | 麻豆精品国产91久久久久久 | 视频一区二区在线观看 | 亚洲日韩中文字幕一区 | 三级黄色片在线播放 | 免费成人高清在线视频 | 一区二区三区观看视频 | 亚洲播放 | 久久成人av | 日本字幕在线观看 | 亚洲精品久久久久久久久久久久久 | ww 255hh 在线观看 | 免费网站国产 | 在线一区视频 | 日韩午夜 | 亚洲欧美日韩在线不卡 | 国产精品一区二区三区久久久 | 777zyz色资源站在线观看 | 日本午夜视频 | 亚洲欧美成人 | 人成在线 | 在线视频亚洲 | 国产免费av在线 | 亚洲精品日韩一区二区电影 | 国产羞羞视频在线观看 |