成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink SQL 知其所以然之流 join 很難嘛???(上)

開(kāi)發(fā) 架構(gòu)
在實(shí)時(shí)數(shù)倉(cāng)中,regular join 以及 interval join,以及兩種 join 的結(jié)合使用是最常使用的。所以本文主要介紹這兩種(太長(zhǎng)的篇幅大家可能也不想看,所以之后的文章就以簡(jiǎn)潔,短為目標(biāo))。

[[437040]]

1.序篇

進(jìn)入正文。

下面即是文章目錄,也對(duì)應(yīng)到本文的結(jié)論,小伙伴可以先看結(jié)論快速了解本文能給你帶來(lái)什么幫助:

  • 背景及應(yīng)用場(chǎng)景介紹:join 作為離線數(shù)倉(cāng)中最常見(jiàn)的場(chǎng)景,在實(shí)時(shí)數(shù)倉(cāng)中也必然不可能缺少它,flink sql 提供的豐富的 join 方式(總結(jié) 6 種:regular join,維表 join,temporal join,interval join,array 拍平,table function 函數(shù))對(duì)我們滿足需求提供了強(qiáng)大的后盾
  • 先來(lái)一個(gè)實(shí)戰(zhàn)案例:以一個(gè)曝光日志 left join 點(diǎn)擊日志為案例展開(kāi),介紹 flink sql join 的解決方案
  • flink sql join 的解決方案以及存在問(wèn)題的介紹:主要介紹 regular join 的在上述案例的運(yùn)行結(jié)果及分析源碼機(jī)制,它雖然簡(jiǎn)單,但是 left join,right join,full join 會(huì)存在著 retract 的問(wèn)題,所以在使用前,你應(yīng)該充分了解其運(yùn)行機(jī)制,避免出現(xiàn)數(shù)據(jù)發(fā)重,發(fā)多的問(wèn)題。
  • 本文主要介紹 regular join retract 的問(wèn)題,下節(jié)介紹怎么使用 interval join 來(lái)避免這種 retract 問(wèn)題,并滿足第 2 點(diǎn)的實(shí)戰(zhàn)案例需求。

2.背景及應(yīng)用場(chǎng)景介紹

在我們的日常場(chǎng)景中,應(yīng)用最廣的一種操作必然有 join 的一席之地,例如

計(jì)算曝光數(shù)據(jù)和點(diǎn)擊數(shù)據(jù)的 CTR,需要通過(guò)唯一 id 進(jìn)行 join 關(guān)聯(lián)

事實(shí)數(shù)據(jù)關(guān)聯(lián)維度數(shù)據(jù)獲取維度,進(jìn)而計(jì)算維度指標(biāo)

上述場(chǎng)景,在離線數(shù)倉(cāng)應(yīng)用之廣就不多說(shuō)了。

那么,實(shí)時(shí)流之間的關(guān)聯(lián)要怎么操作呢?

flink sql 為我們提供了四種強(qiáng)大的關(guān)聯(lián)方式,幫助我們?cè)诹魇綀?chǎng)景中達(dá)到流關(guān)聯(lián)的目的。如下圖官網(wǎng)截圖所示:

join

  • regular join:即 left join,right join,full join,inner join
  • 維表 lookup join:維表關(guān)聯(lián)
  • temporal join:快照表 join
  • interval join:兩條流在一段時(shí)間區(qū)間之內(nèi)的 join
  • array 炸開(kāi):列轉(zhuǎn)行
  • table function join:通過(guò) table function 自定義函數(shù)實(shí)現(xiàn) join(類(lèi)似于列轉(zhuǎn)行的效果,或者說(shuō)類(lèi)似于維表 join 的效果)

在實(shí)時(shí)數(shù)倉(cāng)中,regular join 以及 interval join,以及兩種 join 的結(jié)合使用是最常使用的。所以本文主要介紹這兩種(太長(zhǎng)的篇幅大家可能也不想看,所以之后的文章就以簡(jiǎn)潔,短為目標(biāo))。

3.先來(lái)一個(gè)實(shí)戰(zhàn)案例

先來(lái)一個(gè)實(shí)際案例來(lái)看看在具體輸入值的場(chǎng)景下,輸出值應(yīng)該長(zhǎng)啥樣。

場(chǎng)景:即常見(jiàn)的曝光日志流(show_log)通過(guò) log_id 關(guān)聯(lián)點(diǎn)擊日志流(click_log),將數(shù)據(jù)的關(guān)聯(lián)結(jié)果進(jìn)行下發(fā)。

來(lái)一波輸入數(shù)據(jù):

曝光數(shù)據(jù):

log_id timestamp show_params
1 2021-11-01 00:01:03 show_params
2 2021-11-01 00:03:00 show_params2
3 2021-11-01 00:05:00 show_params3

點(diǎn)擊數(shù)據(jù):

log_id timestamp click_params
1 2021-11-01 00:01:53 click_params
2 2021-11-01 00:02:01 click_params2

預(yù)期輸出數(shù)據(jù)如下:

log_id timestamp show_params click_params
1 2021-11-01 00:01:00 show_params click_params
2 2021-11-01 00:01:00 show_params2 click_params2
3 2021-11-01 00:02:00 show_params3 null

熟悉離線 hive sql 的同學(xué)可能 10s 就寫(xiě)完上面這個(gè) sql 了,如下 hive sql

  1. INSERT INTO sink_table 
  2. SELECT 
  3.     show_log.log_id as log_id, 
  4.     show_log.timestamp as timestamp
  5.     show_log.show_params as show_params, 
  6.     click_log.click_params as click_params 
  7. FROM show_log 
  8. LEFT JOIN click_log ON show_log.log_id = click_log.log_id; 

那么我們看看上述需求如果要以 flink sql 實(shí)現(xiàn)需要怎么做呢?

雖然不 flink sql 提供了 left join 的能力,但是在實(shí)際使用時(shí),可能會(huì)出現(xiàn)預(yù)期之外的問(wèn)題。下節(jié)詳述。

4.flink sql join

4.1.flink sql

還是上面的案例,我們先實(shí)際跑一遍看看結(jié)果:

  1. INSERT INTO sink_table 
  2. SELECT 
  3.     show_log.log_id as log_id, 
  4.     show_log.timestamp as timestamp
  5.     show_log.show_params as show_params, 
  6.     click_log.click_params as click_params 
  7. FROM show_log 
  8. LEFT JOIN click_log ON show_log.log_id = click_log.log_id; 

flink web ui 算子圖如下:

flink web ui

結(jié)果如下:

  1. +[1 | 2021-11-01 00:01:03 | show_params | null
  2.  
  3. -[1 | 2021-11-01 00:01:03 | show_params | null
  4.  
  5. +[1 | 2021-11-01 00:01:03 | show_params | click_params] 
  6.  
  7. +[2 | 2021-11-01 00:03:00 | show_params | click_params] 
  8.  
  9. +[3 | 2021-11-01 00:05:00 | show_params | null

從結(jié)果上看,其輸出數(shù)據(jù)有 +,-,代表其輸出的數(shù)據(jù)是一個(gè) retract 流的數(shù)據(jù)。分析原因發(fā)現(xiàn)是,由于第一條 show_log 先于 click_log 到達(dá), 所以就先直接發(fā)出 +[1 | 2021-11-01 00:01:03 | show_params | null],后面 click_log 到達(dá)之后,將上一次未關(guān)聯(lián)到的 show_log 撤回, 然后將關(guān)聯(lián)到的 +[1 | 2021-11-01 00:01:03 | show_params | click_params] 下發(fā)。

但是 retract 流會(huì)導(dǎo)致寫(xiě)入到 kafka 的數(shù)據(jù)變多,這是不可被接受的。我們期望的結(jié)果應(yīng)該是一個(gè) append 數(shù)據(jù)流。

為什么 left join 會(huì)出現(xiàn)這種問(wèn)題呢?那就要從 left join 的原理說(shuō)起了。

來(lái)定位到具體的實(shí)現(xiàn)源碼。先看一下 transformations。

transformations

可以看到 left join 的具體 operator 是 org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator。

其核心邏輯就集中在 processElement 方法上面。并且源碼對(duì)于 processElement 的處理邏輯有詳細(xì)的注釋說(shuō)明,如下圖所示。

StreamingJoinOperator#processElement

注釋看起來(lái)邏輯比較復(fù)雜。我們這里按照 left join,inner join,right join,full join 分類(lèi)給大家解釋一下。

4.2.left join

首先是 left join,以上面的 show_log(左表) left join click_log(右表) 為例:

  • 首先如果 join xxx on 中的條件是等式則代表 join 是在相同 key 下進(jìn)行的,join 的 key 即 show_log.log_id,click_log.log_id,相同 key 的數(shù)據(jù)會(huì)被發(fā)送到一個(gè)并發(fā)中進(jìn)行處理。如果 join xxx on 中的條件是不等式,則兩個(gè)流的 source 算子向 join 算子下發(fā)數(shù)據(jù)是按照 global 的 partition 策略進(jìn)行下發(fā)的,并且 join 算子并發(fā)會(huì)被設(shè)置為 1,所有的數(shù)據(jù)會(huì)被發(fā)送到這一個(gè)并發(fā)中處理。
  • 相同 key 下,當(dāng) show_log 來(lái)一條數(shù)據(jù),如果 click_log 有數(shù)據(jù):則 show_log 與 click_log 中的所有數(shù)據(jù)進(jìn)行遍歷關(guān)聯(lián)一遍輸出[+(show_log,click_log)]數(shù)據(jù),并且把 show_log 保存到左表的狀態(tài)中(以供后續(xù) join 使用)。
  • 相同 key 下,當(dāng) show_log 來(lái)一條數(shù)據(jù),如果 click_log 中沒(méi)有數(shù)據(jù):則 show_log 不會(huì)等待,直接輸出[+(show_log,null)]數(shù)據(jù),并且把 show_log 保存到左表的狀態(tài)中(以供后續(xù) join 使用)。
  • 相同 key 下,當(dāng) click_log 來(lái)一條數(shù)據(jù),如果 show_log 有數(shù)據(jù):則 click_log 對(duì) show_log 中所有的數(shù)據(jù)進(jìn)行遍歷關(guān)聯(lián)一遍。在輸出數(shù)據(jù)前,會(huì)判斷,如果被關(guān)聯(lián)的這條 show_log 之前沒(méi)有關(guān)聯(lián)到過(guò) click_log(即往下發(fā)過(guò)[+(show_log,null)]),則先發(fā)一條[-(show_log,null)],后發(fā)一條[+(show_log,click_log)] ,代表把之前的那條沒(méi)有關(guān)聯(lián)到 click_log 數(shù)據(jù)的 show_log 中間結(jié)果給撤回,把當(dāng)前關(guān)聯(lián)到的最新結(jié)果進(jìn)行下發(fā),并把 click_log 保存到右表的狀態(tài)中(以供后續(xù)左表進(jìn)行關(guān)聯(lián))。這也就解釋了為什么輸出流是一個(gè) retract 流。
  • 相同 key 下,當(dāng) click_log 來(lái)一條數(shù)據(jù),如果 show_log 沒(méi)有數(shù)據(jù):把 click_log 保存到右表的狀態(tài)中(以供后續(xù)左表進(jìn)行關(guān)聯(lián))。

4.3.inner join

以上面的 show_log(左表) inner join click_log(右表) 為例:

首先如果 join xxx on 中的條件是等式則代表 join 是在相同 key 下進(jìn)行的,join 的 key 即 show_log.log_id,click_log.log_id,相同 key 的數(shù)據(jù)會(huì)被發(fā)送到一個(gè)并發(fā)中進(jìn)行處理。如果 join xxx on 中的條件是不等式,則兩個(gè)流的 source 算子向 join 算子下發(fā)數(shù)據(jù)是按照 global 的 partition 策略進(jìn)行下發(fā)的,并且 join 算子并發(fā)會(huì)被設(shè)置為 1,所有的數(shù)據(jù)會(huì)被發(fā)送到這一個(gè)并發(fā)中處理。

相同 key 下,當(dāng) show_log 來(lái)一條數(shù)據(jù),如果 click_log 有數(shù)據(jù):則 show_log 與 click_log 中的所有數(shù)據(jù)進(jìn)行遍歷關(guān)聯(lián)一遍輸出[+(show_log,click_log)]數(shù)據(jù),并且把 show_log 保存到左表的狀態(tài)中(以供后續(xù) join 使用)。

相同 key 下,當(dāng) show_log 來(lái)一條數(shù)據(jù),如果 click_log 中沒(méi)有數(shù)據(jù):則 show_log 不會(huì)輸出數(shù)據(jù),會(huì)把 show_log 保存到左表的狀態(tài)中(以供后續(xù) join 使用)。

相同 key 下,當(dāng) click_log 來(lái)一條數(shù)據(jù),如果 show_log 有數(shù)據(jù):則 click_log 與 show_log 中的所有數(shù)據(jù)進(jìn)行遍歷關(guān)聯(lián)一遍輸出[+(show_log,click_log)]數(shù)據(jù),并且把 click_log 保存到右表的狀態(tài)中(以供后續(xù) join 使用)。

相同 key 下,當(dāng) click_log 來(lái)一條數(shù)據(jù),如果 show_log 沒(méi)有數(shù)據(jù):則 click_log 不會(huì)輸出數(shù)據(jù),會(huì)把 click_log 保存到右表的狀態(tài)中(以供后續(xù) join 使用)。

4.4.right join

right join 和 left join 一樣,只不過(guò)順序反了,這里不再贅述。

4.5.full join

以上面的 show_log(左表) full join click_log(右表) 為例:

  • 首先如果 join xxx on 中的條件是等式則代表 join 是在相同 key 下進(jìn)行的,join 的 key 即 show_log.log_id,click_log.log_id,相同 key 的數(shù)據(jù)會(huì)被發(fā)送到一個(gè)并發(fā)中進(jìn)行處理。如果 join xxx on 中的條件是不等式,則兩個(gè)流的 source 算子向 join 算子下發(fā)數(shù)據(jù)是按照 global 的 partition 策略進(jìn)行下發(fā)的,并且 join 算子并發(fā)會(huì)被設(shè)置為 1,所有的數(shù)據(jù)會(huì)被發(fā)送到這一個(gè)并發(fā)中處理。
  • 相同 key 下,當(dāng) show_log 來(lái)一條數(shù)據(jù),如果 click_log 有數(shù)據(jù):則 show_log 對(duì) click_log 中所有的數(shù)據(jù)進(jìn)行遍歷關(guān)聯(lián)一遍。在輸出數(shù)據(jù)前,會(huì)判斷,如果被關(guān)聯(lián)的這條 click_log 之前沒(méi)有關(guān)聯(lián)到過(guò) show_log(即往下發(fā)過(guò)[+(null,click_log)]),則先發(fā)一條[-(null,click_log)],后發(fā)一條[+(show_log,click_log)] ,代表把之前的那條沒(méi)有關(guān)聯(lián)到 show_log 數(shù)據(jù)的 click_log 中間結(jié)果給撤回,把當(dāng)前關(guān)聯(lián)到的最新結(jié)果進(jìn)行下發(fā),并把 show_log 保存到左表的狀態(tài)中(以供后續(xù) join 使用)
  • 相同 key 下,當(dāng) show_log 來(lái)一條數(shù)據(jù),如果 click_log 中沒(méi)有數(shù)據(jù):則 show_log 不會(huì)等待,直接輸出[+(show_log,null)]數(shù)據(jù),并且把 show_log 保存到左表的狀態(tài)中(以供后續(xù) join 使用)。
  • 相同 key 下,當(dāng) click_log 來(lái)一條數(shù)據(jù),如果 show_log 有數(shù)據(jù):則 click_log 對(duì) show_log 中所有的數(shù)據(jù)進(jìn)行遍歷關(guān)聯(lián)一遍。在輸出數(shù)據(jù)前,會(huì)判斷,如果被關(guān)聯(lián)的這條 show_log 之前沒(méi)有關(guān)聯(lián)到過(guò) click_log(即往下發(fā)過(guò)[+(show_log,null)]),則先發(fā)一條[-(show_log,null)],后發(fā)一條[+(show_log,click_log)] ,代表把之前的那條沒(méi)有關(guān)聯(lián)到 click_log 數(shù)據(jù)的 show_log 中間結(jié)果給撤回,把當(dāng)前關(guān)聯(lián)到的最新結(jié)果進(jìn)行下發(fā),并把 click_log 保存到右表的狀態(tài)中(以供后續(xù) join 使用)
  • 相同 key 下,當(dāng) click_log 來(lái)一條數(shù)據(jù),如果 show_log 中沒(méi)有數(shù)據(jù):則 click_log 不會(huì)等待,直接輸出[+(null,click_log)]數(shù)據(jù),并且把 click_log 保存到右表的狀態(tài)中(以供后續(xù) join 使用)。

4.6.regular join 的總結(jié)

總的來(lái)說(shuō)上述四種 join 可以按照以下這么劃分。

inner join 會(huì)互相等,直到有數(shù)據(jù)才下發(fā)。

left join,right join,full join 不會(huì)互相等,只要來(lái)了數(shù)據(jù),會(huì)嘗試關(guān)聯(lián),能關(guān)聯(lián)到則下發(fā)的字段是全的,關(guān)聯(lián)不到則另一邊的字段為 null。后續(xù)數(shù)據(jù)來(lái)了之后,發(fā)現(xiàn)之前下發(fā)過(guò)為沒(méi)有關(guān)聯(lián)到的數(shù)據(jù)時(shí),就會(huì)做回撤,把關(guān)聯(lián)到的結(jié)果進(jìn)行下發(fā)

4.7.怎樣才能解決 retract 導(dǎo)致數(shù)據(jù)重復(fù)下發(fā)到 kafka 這個(gè)問(wèn)題呢?

既然 flink sql 在 left join、right join、full join 實(shí)現(xiàn)上的原理就是以這種 retract 的方式去實(shí)現(xiàn)的,就不能通過(guò)這種方式來(lái)滿足業(yè)務(wù)了。

我們來(lái)轉(zhuǎn)變一下思路,上述 join 的特點(diǎn)就是不會(huì)相互等,那有沒(méi)有一種 join 是可以相互等待的呢。以 left join 的思路為例,左表在關(guān)聯(lián)不到右表的時(shí)候,可以選擇等待一段時(shí)間,如果超過(guò)這段時(shí)間還等不到再下發(fā) (show_log,null),如果等到了就下發(fā)(show_log,click_log)。

interval join 閃亮登場(chǎng)。關(guān)于 interval join 是如何實(shí)現(xiàn)上述場(chǎng)景,及其原理實(shí)現(xiàn),本篇的(下)會(huì)詳細(xì)介紹,敬請(qǐng)期待。

5.總結(jié)與展望

源碼公眾號(hào)后臺(tái)回復(fù)1.13.2 sql join 的奇妙解析之路獲取。

本文主要介紹了 flink sql regular 的在滿足 join 場(chǎng)景時(shí)存在的問(wèn)題,并通過(guò)解析其實(shí)現(xiàn)說(shuō)明了運(yùn)行原理,主要包含下面兩部分:

 

  • 背景及應(yīng)用場(chǎng)景介紹:join 作為離線數(shù)倉(cāng)中最常見(jiàn)的場(chǎng)景,在實(shí)時(shí)數(shù)倉(cāng)中也必然不可能缺少它,flink sql 提供的豐富的 join 方式(總結(jié) 4 種:regular join,維表 join,temporal join,interval join)對(duì)我們滿足需求提供了強(qiáng)大的后盾
  • 先來(lái)一個(gè)實(shí)戰(zhàn)案例:以一個(gè)曝光日志 left join 點(diǎn)擊日志為案例展開(kāi),介紹 flink sql join 的解決方案
  • flink sql join 的解決方案以及存在問(wèn)題的介紹:主要介紹 regular join 的在上述案例的運(yùn)行結(jié)果及分析源碼機(jī)制,它雖然簡(jiǎn)單,但是 left join,right join,full join 會(huì)存在著 retract 的問(wèn)題,所以在使用前,你應(yīng)該充分了解其運(yùn)行機(jī)制,避免出現(xiàn)數(shù)據(jù)發(fā)重,發(fā)多的問(wèn)題。
  • 本文主要介紹 regular join retract 的問(wèn)題,下節(jié)介紹怎么使用 interval join 來(lái)避免這種 retract 問(wèn)題,并滿足第 2 點(diǎn)的實(shí)戰(zhàn)案例需求。

 

責(zé)任編輯:武曉燕 來(lái)源: 大數(shù)據(jù)羊說(shuō)
相關(guān)推薦

2021-11-28 11:36:08

SQL Flink Join

2022-05-22 10:02:32

CREATESQL 查詢SQL DDL

2022-07-05 09:03:05

Flink SQLTopN

2022-06-10 09:01:04

OverFlinkSQL

2022-06-06 09:27:23

FlinkSQLGroup

2022-06-18 09:26:00

Flink SQLJoin 操作

2022-05-18 09:02:28

Flink SQLSQL字符串

2022-05-15 09:57:59

Flink SQL時(shí)間語(yǔ)義

2022-06-29 09:01:38

FlinkSQL時(shí)間屬性

2021-12-09 06:59:24

FlinkSQL 開(kāi)發(fā)

2022-05-27 09:02:58

SQLHive語(yǔ)義

2022-05-12 09:02:47

Flink SQL數(shù)據(jù)類(lèi)型

2022-08-10 10:05:29

FlinkSQL

2021-11-30 23:30:45

sql 性能異步

2021-09-12 07:01:07

Flink SQL ETL datastream

2021-12-17 07:54:16

Flink SQLTable DataStream

2021-12-06 07:15:47

開(kāi)發(fā)Flink SQL

2021-12-05 08:28:39

Flink SQLbatch lookuSQL

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2021-11-24 08:17:21

Flink SQLCumulate WiSQL
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 国产亚洲精品91 | 蜜桃综合在线 | 国产视频久久 | 福利精品 | 国产精品久久久久久久久久久免费看 | 久久国产激情视频 | 91一区二区三区 | 韩日精品一区 | av电影一区 | 日韩视频中文字幕 | 成年免费在线观看 | 精品一二三区 | 精品日韩 | 国产一区二区在线播放视频 | 一级片在线免费看 | 欧美一区二区三区久久精品 | 91成人免费看片 | 欧美一a一片一级一片 | 一区二区亚洲 | 国产综合视频 | 精品久久久久久久久亚洲 | 国产综合久久 | 亚洲永久免费观看 | 久久精品国产久精国产 | 国产精品不卡一区 | 人人做人人澡人人爽欧美 | 国产在线观| 成人网址在线观看 | 欧美日韩亚洲一区 | 免费成年网站 | 国产成人在线视频 | 欧美国产日韩一区 | 欧美视频二区 | 日日摸天天添天天添破 | 国产在线麻豆精品入口 | 成人午夜性成交 | 日日天天 | av无遮挡 | 天天操精品视频 | 一级黄色网页 | 国产在线视频在线观看 |