成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink SQL 如何實(shí)現(xiàn)數(shù)據(jù)流的 Join?

大數(shù)據(jù)
無論在 OLAP 還是 OLTP 領(lǐng)域,Join 都是業(yè)務(wù)常會涉及到且優(yōu)化規(guī)則比較復(fù)雜的 SQL 語句。對于離線計算而言,經(jīng)過數(shù)據(jù)庫領(lǐng)域多年的積累,Join 語義以及實(shí)現(xiàn)已經(jīng)十分成熟,然而對于近年來剛興起的 Streaming SQL 來說 Join 卻處于剛起步的狀態(tài)。

無論在 OLAP 還是 OLTP 領(lǐng)域,Join 都是業(yè)務(wù)常會涉及到且優(yōu)化規(guī)則比較復(fù)雜的 SQL 語句。對于離線計算而言,經(jīng)過數(shù)據(jù)庫領(lǐng)域多年的積累,Join 語義以及實(shí)現(xiàn)已經(jīng)十分成熟,然而對于近年來剛興起的 Streaming SQL 來說 Join 卻處于剛起步的狀態(tài)。

其中最為關(guān)鍵的問題在于 Join 的實(shí)現(xiàn)依賴于緩存整個數(shù)據(jù)集,而 Streaming SQL Join 的對象卻是無限的數(shù)據(jù)流,內(nèi)存壓力和計算效率在長期運(yùn)行來說都是不可避免的問題。下文將結(jié)合 SQL 的發(fā)展解析 Flink SQL 是如何解決這些問題并實(shí)現(xiàn)兩個數(shù)據(jù)流的 Join。

離線 Batch SQL Join 的實(shí)現(xiàn)

傳統(tǒng)的離線 Batch SQL (面向有界數(shù)據(jù)集的 SQL)有三種基礎(chǔ)的實(shí)現(xiàn)方式,分別是 Nested-loop Join、Sort-Merge Join 和 Hash Join。

  • Nested-loop Join 最為簡單直接,將兩個數(shù)據(jù)集加載到內(nèi)存,并用內(nèi)嵌遍歷的方式來逐個比較兩個數(shù)據(jù)集內(nèi)的元素是否符合 Join 條件。Nested-loop Join 雖然時間效率以及空間效率都是最低的,但勝在比較靈活適用范圍廣,因此其變體 BNL 常被傳統(tǒng)數(shù)據(jù)庫用作為 Join 的默認(rèn)基礎(chǔ)選項。
  • Sort-Merge Join 顧名思義,分為兩個 Sort 和 Merge 階段。首先將兩個數(shù)據(jù)集進(jìn)行分別排序,然后對兩個有序數(shù)據(jù)集分別進(jìn)行遍歷和匹配,類似于歸并排序的合并。值得注意的是,Sort-Merge 只適用于 Equi-Join(Join 條件均使用等于作為比較算子)。Sort-Merge Join 要求對兩個數(shù)據(jù)集進(jìn)行排序,成本很高,通常作為輸入本就是有序數(shù)據(jù)集的情況下的優(yōu)化方案。
  • Hash Join 同樣分為兩個階段,首先將一個數(shù)據(jù)集轉(zhuǎn)換為 Hash Table,然后遍歷另外一個數(shù)據(jù)集元素并與 Hash Table 內(nèi)的元素進(jìn)行匹配。第一階段和第一個數(shù)據(jù)集分別稱為 build 階段和 build table,第二個階段和第二個數(shù)據(jù)集分別稱為 probe 階段和 probe table。Hash Join 效率較高但對空間要求較大,通常是作為 Join 其中一個表為適合放入內(nèi)存的小表的情況下的優(yōu)化方案。和 Sort-Merge Join 類似,Hash Join 也只適用于 Equi-Join。

實(shí)時 Streaming SQL Join

相對于離線的 Join,實(shí)時 Streaming SQL(面向無界數(shù)據(jù)集的 SQL)無法緩存所有數(shù)據(jù),因此 Sort-Merge Join 要求的對數(shù)據(jù)集進(jìn)行排序基本是無法做到的,而 Nested-loop Join 和 Hash Join 經(jīng)過一定的改良則可以滿足實(shí)時 SQL 的要求。

我們通過例子來看基本的 Nested Join 在實(shí)時 Streaming SQL 的基礎(chǔ)實(shí)現(xiàn)(案例及圖來自 Piotr Nowojski 在 Flink Forward San Francisco 的分享[2])。

 

Flink SQL 如何實(shí)現(xiàn)數(shù)據(jù)流的 Join?
圖1. Join-in-continuous-query-1

Table A 有 1、42 兩個元素,Table B 有 42 一個元素,所以此時的 Join 結(jié)果會輸出 42。

 

Flink SQL 如何實(shí)現(xiàn)數(shù)據(jù)流的 Join?
圖2. Join-in-continuous-query-2

接著 Table B 依次接受到三個新的元素,分別是 7、3、1。因為 1 匹配到 Table A 的元素,因此結(jié)果表再輸出一個元素 1。

 

Flink SQL 如何實(shí)現(xiàn)數(shù)據(jù)流的 Join?
圖3. Join-in-continuous-query-3

隨后 Table A 出現(xiàn)新的輸入 2、3、6,3 匹配到 Table B 的元素,因此再輸出 3 到結(jié)果表。

可以看到在 Nested-Loop Join 中我們需要保存兩個輸入表的內(nèi)容,而隨著時間的增長 Table A 和 Table B 需要保存的歷史數(shù)據(jù)無止境地增長,導(dǎo)致很不合理的內(nèi)存磁盤資源占用,而且單個元素的匹配效率也會越來越低。類似的問題也存在于 Hash Join 中。

那么有沒有可能設(shè)置一個緩存剔除策略,將不必要的歷史數(shù)據(jù)及時清理呢?答案是肯定的,關(guān)鍵在于緩存剔除策略如何實(shí)現(xiàn),這也是 Flink SQL 提供的三種 Join 的主要區(qū)別。

Flink SQL 的 Join

Regular Join

Regular Join 是最為基礎(chǔ)的沒有緩存剔除策略的 Join。Regular Join 中兩個表的輸入和更新都會對全局可見,影響之后所有的 Join 結(jié)果。舉例,在一個如下的 Join 查詢里,Orders 表的新紀(jì)錄會和 Product 表所有歷史紀(jì)錄以及未來的紀(jì)錄進(jìn)行匹配。

  1. SELECT * FROM OrdersINNER JOIN ProductON Orders.productId = Product.id 

因為歷史數(shù)據(jù)不會被清理,所以 Regular Join 允許對輸入表進(jìn)行任意種類的更新操作(insert、update、delete)。然而因為資源問題 Regular Join 通常是不可持續(xù)的,一般只用做有界數(shù)據(jù)流的 Join。

Time-Windowed Join

Time-Windowed Join 利用窗口給兩個輸入表設(shè)定一個 Join 的時間界限,超出時間范圍的數(shù)據(jù)則對 JOIN 不可見并可以被清理掉。值得注意的是,這里涉及到的一個問題是時間的語義,時間可以指計算發(fā)生的系統(tǒng)時間(即 Processing Time),也可以指從數(shù)據(jù)本身的時間字段提取的 Event Time。如果是 Processing Time,F(xiàn)link 根據(jù)系統(tǒng)時間自動劃分 Join 的時間窗口并定時清理數(shù)據(jù);如果是 Event Time,F(xiàn)link 分配 Event Time 窗口并依據(jù) Watermark 來清理數(shù)據(jù)。

以更常用的 Event Time Windowed Join 為例,一個將 Orders 訂單表和 Shipments 運(yùn)輸單表依據(jù)訂單時間和運(yùn)輸時間 Join 的查詢?nèi)缦?

  1. SELECT *FROM  Orders o,  Shipments sWHERE  o.id = s.orderId AND s.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL '4' HOUR 

這個查詢會為 Orders 表設(shè)置了 o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的時間下界(圖4)。

 

Flink SQL 如何實(shí)現(xiàn)數(shù)據(jù)流的 Join?
圖4. Time-Windowed Join 的時間下界 - Orders 表

并為 Shipmenets 表設(shè)置了 s.shiptime >= o.ordertime 的時間下界(圖5)。

 

Flink SQL 如何實(shí)現(xiàn)數(shù)據(jù)流的 Join?
圖5. Time-Windowed Join 的時間下界 - Shipment 表

因此兩個輸入表都只需要緩存在時間下界以上的數(shù)據(jù),將空間占用維持在合理的范圍。

不過雖然底層實(shí)現(xiàn)上沒有問題,但如何通過 SQL 語法定義時間仍是難點(diǎn)。盡管在實(shí)時計算領(lǐng)域 Event Time、Processing Time、Watermark 這些概念已經(jīng)成為業(yè)界共識,但在 SQL 領(lǐng)域?qū)r間數(shù)據(jù)類型的支持仍比較弱[4]。因此,定義 Watermark 和時間語義都需要通過編程 API 的方式完成,比如從 DataStream 轉(zhuǎn)換至 Table ,不能單純靠 SQL 完成。這方面的支持 Flink 社區(qū)計劃通過拓展 SQL 方言來完成,感興趣的讀者可以通過 FLIP-66[7] 來追蹤進(jìn)度。

Temporal Table Join

雖然 Timed-Windowed Join 解決了資源問題,但也限制了使用場景: Join 兩個輸入流都必須有時間下界,超過之后則不可訪問。這對于很多 Join 維表的業(yè)務(wù)來說是不適用的,因為很多情況下維表并沒有時間界限。針對這個問題,F(xiàn)link 提供了 Temporal Table Join 來滿足用戶需求。

Temporal Table Join 類似于 Hash Join,將輸入分為 Build Table 和 Probe Table。前者一般是緯度表的 changelog,后者一般是業(yè)務(wù)數(shù)據(jù)流,典型情況下后者的數(shù)據(jù)量應(yīng)該遠(yuǎn)大于前者。在 Temporal Table Join 中,Build Table 是一個基于 append-only 數(shù)據(jù)流的帶時間版本的視圖,所以又稱為 Temporal Table。Temporal Table 要求定義一個主鍵和用于版本化的字段(通常就是 Event Time 時間字段),以反映記錄在不同時間的內(nèi)容。

比如典型的一個例子是對商業(yè)訂單金額進(jìn)行匯率轉(zhuǎn)換。假設(shè)有一個 Orders 流記錄訂單金額,需要和 RatesHistory 匯率流進(jìn)行 Join。RatesHistory 代表不同貨幣轉(zhuǎn)為日元的匯率,每當(dāng)匯率有變化時就會有一條更新記錄。兩個表在某一時間節(jié)點(diǎn)內(nèi)容如下:

 

Flink SQL 如何實(shí)現(xiàn)數(shù)據(jù)流的 Join?
圖6. Temporal Table Join Example]

我們將 RatesHistory 注冊為一個名為 Rates 的 Temporal Table,設(shè)定主鍵為 currency,版本字段為 time。

 

Flink SQL 如何實(shí)現(xiàn)數(shù)據(jù)流的 Join?
圖7. Temporal Table Registration]

此后給 Rates 指定時間版本,Rates 則會基于 RatesHistory 來計算符合時間版本的匯率轉(zhuǎn)換內(nèi)容。

 

Flink SQL 如何實(shí)現(xiàn)數(shù)據(jù)流的 Join?
圖8. Temporal Table Content]

在 Rates 的幫助下,我們可以將業(yè)務(wù)邏輯用以下的查詢來表達(dá):

  1. SELECT  o.amount * r.rateFROM Orders o, LATERAL Table(Rates(o.time)) rWHERE o.currency = r.currency 

值得注意的是,不同于在 Regular Join 和 Time-Windowed Join 中兩個表是平等的,任意一個表的新記錄都可以與另一表的歷史記錄進(jìn)行匹配,在 Temporal Table Join 中,Temoparal Table 的更新對另一表在該時間節(jié)點(diǎn)以前的記錄是不可見的。這意味著我們只需要保存 Build Side 的記錄直到 Watermark 超過記錄的版本字段。因為 Probe Side 的輸入理論上不會再有早于 Watermark 的記錄,這些版本的數(shù)據(jù)可以安全地被清理掉。

總結(jié)

實(shí)時領(lǐng)域 Streaming SQL 中的 Join 與離線 Batch SQL 中的 Join 最大不同點(diǎn)在于無法緩存完整數(shù)據(jù)集,而是要給緩存設(shè)定基于時間的清理條件以限制 Join 涉及的數(shù)據(jù)范圍。根據(jù)清理策略的不同,F(xiàn)link SQL 分別提供了 Regular Join、Time-Windowed Join 和 Temporal Table Join 來應(yīng)對不同業(yè)務(wù)場景。

另外,盡管在實(shí)時計算領(lǐng)域 Join 可以靈活地用底層編程 API 來實(shí)現(xiàn),但在 Streaming SQL 中 Join 的發(fā)展仍處于比較初級的階段,其中關(guān)鍵點(diǎn)在于如何將時間屬性合適地融入 SQL 中,這點(diǎn) ISO SQL 委員會制定的 SQL 標(biāo)準(zhǔn)并沒有給出完整的答案。或者從另外一個角度來講,作為 Streaming SQL 最早的開拓者之一,F(xiàn)link 社區(qū)很適合探索出一套合理的 SQL 語法反過來貢獻(xiàn)給 ISO。

作者介紹:

林小鉑,網(wǎng)易游戲高級開發(fā)工程師,負(fù)責(zé)游戲數(shù)據(jù)中心實(shí)時平臺的開發(fā)及運(yùn)維工作,目前專注于 Apache Flink 的開發(fā)及應(yīng)用。探究問題本來就是一種樂趣。

責(zé)任編輯:未麗燕 來源: 今日頭條
相關(guān)推薦

2011-12-14 15:57:13

javanio

2009-04-13 16:35:25

TSQL查詢SQL Server

2020-04-14 15:18:16

SparkFlink框架

2016-11-14 19:01:36

數(shù)據(jù)流聊天系統(tǒng)web

2022-03-18 08:57:17

前端數(shù)據(jù)流選型

2009-08-19 10:41:12

Java輸入數(shù)據(jù)流

2020-01-13 14:39:06

FlinkSQL無限流

2011-09-01 18:38:02

SQL Server 文件流功能

2017-11-16 19:26:34

海量數(shù)據(jù)算法計算機(jī)

2021-10-27 10:43:36

數(shù)據(jù)流中位數(shù)偶數(shù)

2019-06-18 13:51:08

大數(shù)據(jù)流處理新興市場

2021-06-08 05:50:00

數(shù)據(jù)流數(shù)字化轉(zhuǎn)型數(shù)字化

2011-04-14 14:43:38

SSISTransformat

2011-08-19 16:07:33

SQL Server數(shù)據(jù)流

2012-07-30 08:31:08

Storm數(shù)據(jù)流

2024-07-05 10:17:08

數(shù)據(jù)流系統(tǒng)CPU

2023-08-18 09:29:59

Java數(shù)據(jù)流

2011-04-19 09:18:02

SSIS數(shù)據(jù)轉(zhuǎn)換

2014-02-11 08:51:15

亞馬遜PaaSAppStream

2021-06-29 19:24:42

數(shù)據(jù)流數(shù)據(jù)排序
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 色综合天天天天做夜夜夜夜做 | 黄色毛片在线观看 | 亚洲网站在线观看 | 国产激情99 | 亚洲国产一区在线 | 亚洲a人| 日韩欧美精品 | 亚洲精品国产成人 | h漫在线观看 | 最新av在线播放 | 天天天操| 国产日韩欧美在线播放 | 久久99蜜桃综合影院免费观看 | 在线中文视频 | 日韩福利片 | 在线国产中文字幕 | 欧美网站一区二区 | 色婷婷精品久久二区二区蜜臂av | 成人性视频免费网站 | 一区免费| 亚洲精品久久久久久一区二区 | 在线看av网址 | 在线欧美a | 欧美a在线 | 国产精品久久久久久婷婷天堂 | 国产精品入口 | 午夜久久久 | 亚洲欧美精品 | 久久精彩视频 | 9久久婷婷国产综合精品性色 | 日韩一区二区三区四区五区 | 北条麻妃国产九九九精品小说 | 成人性视频免费网站 | 在线免费观看色 | 亚洲高清免费观看 | 羞羞的视频免费观看 | 国产目拍亚洲精品99久久精品 | 欧美极品一区二区 | xxx.在线观看| 国产资源网 | 日韩欧美一区在线 |