Apache Flink 漫談系列(11) - Temporal Table JOIN
一、什么是Temporal Table
在《Apache Flink 漫談系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家詳細(xì)介紹什么是Temporal Table JOIN。
在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的數(shù)據(jù)庫(kù)廠商也先后實(shí)現(xiàn)了這個(gè)標(biāo)準(zhǔn)。Temporal Table記錄了歷史上任何時(shí)間點(diǎn)所有的數(shù)據(jù)改動(dòng),Temporal Table的工作流程如下:
上圖示意Temporal Table具有普通table的特性,有具體獨(dú)特的DDL/DML/QUERY語(yǔ)法,時(shí)間是其核心屬性。歷史意味著時(shí)間,意味著快照Snapshot。
二、ANSI-SQL 2011 Temporal Table示例
我們以一個(gè)DDL和一套DML示例說(shuō)明Temporal Table的原理,DDL定義PK是可選的,下面的示例我們以不定義PK的為例進(jìn)行說(shuō)明:
1. DDL 示例
- CREATE TABLE Emp
- ENo INTEGER,
- Sys_Start TIMESTAMP(12) GENERATED
- ALWAYS AS ROW Start,
- Sys_end TIMESTAMP(12) GENERATED
- ALWAYS AS ROW END,
- EName VARCHAR(30),
- PERIOD FOR SYSTEM_TIME (Sys_Start,Sys_end)
- ) WITH SYSTEM VERSIONING
2. DML 示例
(1) INSERT
- INSERT INTO Emp (ENo, EName) VALUES (22217, 'Joe')
說(shuō)明: 其中Sys_Start和Sys_End是數(shù)據(jù)庫(kù)系統(tǒng)默認(rèn)填充的。
(2) UPDATE
- UPDATE Emp SET EName = 'Tom' WHERE ENo = 22217
說(shuō)明: 假設(shè)是在 2012-02-03 10:00:00 執(zhí)行的UPDATE,執(zhí)行之后上一個(gè)值"Joe"的Sys_End值由9999-12-31 23:59:59 變成了 2012-02-03 10:00:00, 也就是下一個(gè)值"Tom"生效的開(kāi)始時(shí)間。可見(jiàn)我們執(zhí)行的是UPDATE但是數(shù)據(jù)庫(kù)里面會(huì)存在兩條數(shù)據(jù),數(shù)據(jù)值和有效期不同,也就是版本不同。
(3) DELETE (假設(shè)執(zhí)行DELETE之前的表內(nèi)容如下)
- DELETE FROM Emp WHERE ENo = 22217
說(shuō)明: 假設(shè)我們是在2012-06-01 00:00:00執(zhí)行的DELETE,則Sys_End值由9999-12-31 23:59:59 變成了 2012-06-01 00:00:00, 也就是在執(zhí)行DELETE時(shí)候沒(méi)有真正的刪除符合條件的行,而是系統(tǒng)將符合條件的行的Sys_end修改為執(zhí)行DELETE的操作時(shí)間。標(biāo)識(shí)數(shù)據(jù)的有效期到DELETE執(zhí)行那一刻為止。
(4) SELECT
- SELECT ENo,EName,Sys_Start,Sys_End FROM Emp
- FOR SYSTEM_TIME AS OF TIMESTAMP '2011-01-02 00:00:00'
說(shuō)明: 這個(gè)查詢會(huì)返回所有Sys_Start <= 2011-01-02 00:00:00 并且 Sys_end > 2011-01-02 00:00:00 的記錄。
三、SQLServer Temporal Table 示例
1. DDL
- CREATE TABLE Department
- (
- DeptID int NOT NULL PRIMARY KEY CLUSTERED
- , DeptName varchar(50) NOT NULL
- , ManagerID INT NULL
- , ParentDeptID int NULL
- , SysStartTime datetime2 GENERATED ALWAYS AS ROW Start NOT NULL
- , SysEndTime datetime2 GENERATED ALWAYS AS ROW END NOT NULL
- , PERIOD FOR SYSTEM_TIME (SysStartTime,SysEndTime)
- )
- WITH (SYSTEM_VERSIONING = ON);
執(zhí)行上面的語(yǔ)句,在數(shù)據(jù)庫(kù)會(huì)創(chuàng)建當(dāng)前表和歷史表,如下圖:
Department 顯示是有版本控制的,歷史表是默認(rèn)的名字,我也可以指定名字如:SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.DepartmentHistory)。
2. DML
(1) INSERT - 插入列不包含SysStartTime和SysEndTime列
- INSERT INTO [dbo].[Department] ([DeptID] ,[DeptName] ,[ManagerID] ,[ParentDeptID])
- VALUES(10, 'Marketing', 101, 1);
VALUES(10, 'Marketing', 101, 1);
執(zhí)行之后我們分別查詢當(dāng)前表和歷史表,如下圖:
我們***條INSERT語(yǔ)句數(shù)據(jù)值的有效時(shí)間是操作那一刻2018-06-06 05:50:20.7913985 到永遠(yuǎn) 9999-12-31 23:59:59.9999999,但這時(shí)刻歷史表還沒(méi)有任何信息。我們接下來(lái)進(jìn)行更新操作。
(2) UPDATE
- UPDATE [dbo].[Department] SET [ManagerID] = 501 WHERE [DeptID] = 10
執(zhí)行之后當(dāng)前表信息會(huì)更新并在歷史表里面產(chǎn)生一條歷史信息,如下:
注意當(dāng)前表的SysStartTime意見(jiàn)發(fā)生了變化,歷史表產(chǎn)生了一條記錄,SyStartTIme是原當(dāng)前表記錄的SysStartTime,SysEndTime是當(dāng)前表記錄的SystemStartTime。我們?cè)俑乱淮危?/p>
- UPDATE [dbo].[Department] SET [ManagerID] = 201 WHERE [DeptID] = 10
到這里我們了解到SQLServer里面關(guān)于Temporal Table的邏輯是有當(dāng)前表和歷史表來(lái)存儲(chǔ)數(shù)據(jù),并且數(shù)據(jù)庫(kù)內(nèi)部以StartTime和EndTime的方式管理數(shù)據(jù)的版本。
(3) SELECT
- SELECT [DeptID], [DeptName], [SysStartTime],[SysEndTime]
- FROM [dbo].[Department]
- FOR SYSTEM_TIME AS OF '2018-06-06 05:50:21.0000000' ;
SELECT語(yǔ)句查詢的是Department的表,實(shí)際返回的數(shù)據(jù)是從歷史表里面查詢出來(lái)的,查詢的底層邏輯就是 SysStartTime <= '2018-06-06 05:50:21.0000000' and SysEndTime > '2018-06-06 05:50:21.0000000' 。
四、Apache Flink Temporal Table
我們不止一次的提到Apache Flink遵循ANSI-SQL標(biāo)準(zhǔn),Apache Flink中Temporal Table的概念也源于ANSI-2011的標(biāo)準(zhǔn)語(yǔ)義,但目前的實(shí)現(xiàn)在語(yǔ)法層面和ANSI-SQL略有差別,上面看到ANSI-2011中使用FOR SYSTEM_TIME AS OF的語(yǔ)法,目前Apache Flink中使用 LATERAL TABLE(TemporalTableFunction)的語(yǔ)法。這一點(diǎn)后續(xù)需要推動(dòng)社區(qū)進(jìn)行改進(jìn)。
1. 為啥需要 Temporal Table
我們以具體的查詢示例來(lái)說(shuō)明為啥需要Temporal Table,假設(shè)我們有一張實(shí)時(shí)變化的匯率表(RatesHistory),如下:
RatesHistory代表了Yen匯率(Yen匯率為1),是不斷變化的Append only的匯率表。例如,Euro兌Yen匯率從09:00至10:45的匯率為114。從10點(diǎn)45分到11點(diǎn)15分是116。
假設(shè)我們想在10:58輸出所有當(dāng)前匯率,我們需要以下SQL查詢來(lái)計(jì)算結(jié)果表:
- SELECT *
- FROM RatesHistory AS r
- WHERE r.rowtime = (
- SELECT MAX(rowtime)
- FROM RatesHistory AS r2
- WHERE rr2.currency = r.currency
- AND r2.rowtime <= '10:58');
相應(yīng)Flink代碼如下:
- 定義數(shù)據(jù)源-genRatesHistorySource
- def genRatesHistorySource: CsvTableSource = {
- val csvRecords = Seq(
- "rowtime ,currency ,rate",
- "09:00:00 ,US Dollar , 102",
- "09:00:00 ,Euro , 114",
- "09:00:00 ,Yen , 1",
- "10:45:00 ,Euro , 116",
- "11:15:00 ,Euro , 119",
- "11:49:00 ,Pounds , 108"
- )
- // 測(cè)試數(shù)據(jù)寫(xiě)入臨時(shí)文件
- val tempFilePath =
- writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("rowtime","currency","rate"),
- Array(
- Types.STRING,Types.STRING,Types.STRING
- ),
- fieldDelim = ",",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- def writeToTempFile(
- contents: String,
- filePrefix: String,
- fileSuffix: String,
- charset: String = "UTF-8"): String = {
- val tempFile = File.createTempFile(filePrefix, fileSuffix)
- val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset)
- tmpWriter.write(contents)
- tmpWriter.close()
- tempFile.getAbsolutePath}
- def main(args: Array[String]): Unit = {
- // Streaming 環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- //方便我們查出輸出數(shù)據(jù)
- env.setParallelism(1)
- val sourceTableName = "RatesHistory"
- // 創(chuàng)建CSV source數(shù)據(jù)結(jié)構(gòu)
- val tableSource = CsvTableSourceUtils.genRatesHistorySource
- // 注冊(cè)source
- tEnv.registerTableSource(sourceTableName, tableSource)
- // 注冊(cè)retract sink
- val sinkTableName = "retractSink"
- val fieldNames = Array("rowtime", "currency", "rate")
- val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.STRING, Types.STRING)
- tEnv.registerTableSink(
- sinkTableName,
- fieldNames,
- fieldTypes,
- new MemoryRetractSink)
- val SQL =
- """
- |SELECT *
- |FROM RatesHistory AS r
- |WHERE r.rowtime = (
- | SELECT MAX(rowtime)
- | FROM RatesHistory AS r2
- | WHERE rr2.currency = r.currency
- | AND r2.rowtime <= '10:58:00' )
- """.stripMargin
- // 執(zhí)行查詢
- val result = tEnv.SQLQuery(SQL)
- // 將結(jié)果插入sink
- result.insertInto(sinkTableName)
- env.execute()
- }
結(jié)果表格化一下:
Temporal Table的概念旨在簡(jiǎn)化此類查詢,加速它們的執(zhí)行。Temporal Table是Append Only表上的參數(shù)化視圖,它把Append Only的表變化解釋為表的Changelog,并在特定時(shí)間點(diǎn)提供該表的版本(時(shí)間版本)。將Applend Only表解釋為changelog需要指定主鍵屬性和時(shí)間戳屬性。主鍵確定覆蓋哪些行,時(shí)間戳確定行有效的時(shí)間,也就是數(shù)據(jù)版本,與上面SQL Server示例的有效期的概念一致。
在上面的示例中,currency是RatesHistory表的主鍵,而rowtime是timestamp屬性。
2. 如何定義Temporal Table
在Apache Flink中擴(kuò)展了TableFunction的接口,在TableFunction接口的基礎(chǔ)上添加了時(shí)間屬性和pk屬性。
(1) 內(nèi)部TemporalTableFunction定義如下:
- class TemporalTableFunction private(
- @transient private val underlyingHistoryTable: Table,
- // 時(shí)間屬性,相當(dāng)于版本信息
- private val timeAttribute: Expression,
- // 主鍵定義
- private val primaryKey: String,
- private val resultType: RowTypeInfo)
- extends TableFunction[Row] {
- ...}
(2) 用戶創(chuàng)建TemporalTableFunction方式
在Table中添加了createTemporalTableFunction方法,該方法需要傳入時(shí)間屬性和主鍵,接口定義如下:
- // Creates TemporalTableFunction backed up by this table as a history table.
- def createTemporalTableFunction(
- timeAttribute: Expression,
- primaryKey: Expression): TemporalTableFunction = {
- ...}
用戶通過(guò)如下方式調(diào)用就可以得到一個(gè)TemporalTableFunction的實(shí)例,代碼如下:
- val tab = ...
- val temporalTableFunction = tab.createTemporalTableFunction('time, 'pk)
- ...
3. 案例代碼
(1) 需求描述
假設(shè)我們有一張訂單表Orders和一張匯率表Rates,那么訂單來(lái)自于不同的地區(qū),所以支付的幣種各不一樣,那么假設(shè)需要統(tǒng)計(jì)每個(gè)訂單在下單時(shí)候Yen幣種對(duì)應(yīng)的金額。
(2) Orders 數(shù)據(jù)
(3) Rates 數(shù)據(jù)
(4) 統(tǒng)計(jì)需求對(duì)應(yīng)的SQL
- SELECT o.currency, o.amount, r.rate
- o.amount * r.rate AS yen_amount
- FROM
- Orders AS o,
- LATERAL TABLE (Rates(o.rowtime)) AS r
- WHERE r.currency = o.currency
(5) 預(yù)期結(jié)果
4. Without connnector 實(shí)現(xiàn)代碼
- object TemporalTableJoinTest {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- // 設(shè)置時(shí)間類型是 event-time env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 構(gòu)造訂單數(shù)據(jù)
- val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
- ordersData.+=((2L, "Euro", new Timestamp(2L)))
- ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
- ordersData.+=((50L, "Yen", new Timestamp(4L)))
- ordersData.+=((3L, "Euro", new Timestamp(5L)))
- //構(gòu)造匯率數(shù)據(jù)
- val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
- ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
- ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
- ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
- ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
- ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))
- // 進(jìn)行訂單表 event-time 的提取
- val orders = env
- .fromCollection(ordersData)
- .assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]())
- .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)
- // 進(jìn)行匯率表 event-time 的提取
- val ratesHistory = env
- .fromCollection(ratesHistoryData)
- .assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]())
- .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)
- // 注冊(cè)訂單表和匯率表
- tEnv.registerTable("Orders", orders)
- tEnv.registerTable("RatesHistory", ratesHistory)
- val tab = tEnv.scan("RatesHistory");
- // 創(chuàng)建TemporalTableFunction
- val temporalTableFunction = tab.createTemporalTableFunction('rowtime, 'currency)
- //注冊(cè)TemporalTableFunction
- tEnv.registerFunction("Rates",temporalTableFunction)
- val SQLQuery =
- """
- |SELECT o.currency, o.amount, r.rate,
- | o.amount * r.rate AS yen_amount
- |FROM
- | Orders AS o,
- | LATERAL TABLE (Rates(o.rowtime)) AS r
- |WHERE r.currency = o.currency
- |""".stripMargin
- tEnv.registerTable("TemporalJoinResult", tEnv.SQLQuery(SQLQuery))
- val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
- // 打印查詢結(jié)果
- result.print()
- env.execute()
- }
- }
在運(yùn)行上面代碼之前需要注意上面代碼中對(duì)EventTime時(shí)間提取的過(guò)程,也就是說(shuō)Apache Flink的TimeCharacteristic.EventTime 模式,需要調(diào)用assignTimestampsAndWatermarks方法設(shè)置EventTime的生成方式,這種方式也非常靈活,用戶可以控制業(yè)務(wù)數(shù)據(jù)的EventTime的值和WaterMark的產(chǎn)生,WaterMark相關(guān)內(nèi)容可以查閱《Apache Flink 漫談系列(03) - Watermark》。 在本示例中提取EventTime的完整代碼如下:
- import java.SQL.Timestamp
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.windowing.time.Time
- class OrderTimestampExtractor[T1, T2]
- extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
- override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
- element._3.getTime
- }
- }
查看運(yùn)行結(jié)果:
5. With CSVConnector 實(shí)現(xiàn)代碼
在實(shí)際的生產(chǎn)開(kāi)發(fā)中,都需要實(shí)際的Connector的定義,下面我們以CSV格式的Connector定義來(lái)開(kāi)發(fā)Temporal Table JOIN Demo。
(1) genEventRatesHistorySource
- def genEventRatesHistorySource: CsvTableSource = {
- val csvRecords = Seq(
- "ts#currency#rate",
- "1#US Dollar#102",
- "1#Euro#114",
- "1#Yen#1",
- "3#Euro#116",
- "5#Euro#119",
- "7#Pounds#108"
- )
- // 測(cè)試數(shù)據(jù)寫(xiě)入臨時(shí)文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency","rate"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )}
(2) genRatesOrderSource
- def genRatesOrderSource: CsvTableSource = {
- val csvRecords = Seq(
- "ts#currency#amount",
- "2#Euro#10",
- "4#Euro#10"
- )
- // 測(cè)試數(shù)據(jù)寫(xiě)入臨時(shí)文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency", "amount"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
(3) 主程序代碼
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.flink.book.connectors
- import java.io.File
- import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
- import org.apache.flink.book.utils.{CommonUtils, FileUtils}
- import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
- import org.apache.flink.table.sources.CsvTableSource
- import org.apache.flink.types.Row
- object CsvTableSourceUtils {
- def genWordCountSource: CsvTableSource = {
- val csvRecords = Seq(
- "words",
- "Hello Flink",
- "Hi, Apache Flink",
- "Apache FlinkBook"
- )
- // 測(cè)試數(shù)據(jù)寫(xiě)入臨時(shí)文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("words"),
- Array(
- Types.STRING
- ),
- fieldDelim = "#",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- def genRatesHistorySource: CsvTableSource = {
- val csvRecords = Seq(
- "rowtime ,currency ,rate",
- "09:00:00 ,US Dollar , 102",
- "09:00:00 ,Euro , 114",
- "09:00:00 ,Yen , 1",
- "10:45:00 ,Euro , 116",
- "11:15:00 ,Euro , 119",
- "11:49:00 ,Pounds , 108"
- )
- // 測(cè)試數(shù)據(jù)寫(xiě)入臨時(shí)文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("rowtime","currency","rate"),
- Array(
- Types.STRING,Types.STRING,Types.STRING
- ),
- fieldDelim = ",",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- def genEventRatesHistorySource: CsvTableSource = {
- val csvRecords = Seq(
- "ts#currency#rate",
- "1#US Dollar#102",
- "1#Euro#114",
- "1#Yen#1",
- "3#Euro#116",
- "5#Euro#119",
- "7#Pounds#108"
- )
- // 測(cè)試數(shù)據(jù)寫(xiě)入臨時(shí)文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency","rate"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- def genRatesOrderSource: CsvTableSource = {
- val csvRecords = Seq(
- "ts#currency#amount",
- "2#Euro#10",
- "4#Euro#10"
- )
- // 測(cè)試數(shù)據(jù)寫(xiě)入臨時(shí)文件
- val tempFilePath =
- FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")
- // 創(chuàng)建Source connector
- new CsvTableSource(
- tempFilePath,
- Array("ts","currency", "amount"),
- Array(
- Types.LONG,Types.STRING,Types.LONG
- ),
- fieldDelim = "#",
- rowDelim = CommonUtils.line,
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
- /**
- * Example:
- * genCsvSink(
- * Array[String]("word", "count"),
- * Array[TypeInformation[_] ](Types.STRING, Types.LONG))
- */
- def genCsvSink(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
- val tempFile = File.createTempFile("csv_sink_", "tem")
- if (tempFile.exists()) {
- tempFile.delete()
- }
- new CsvTableSink(tempFile.getAbsolutePath).configure(fieldNames, fieldTypes)
- }
- }
運(yùn)行結(jié)果如下 :
6. 內(nèi)部實(shí)現(xiàn)原理
我們還是以訂單和匯率關(guān)系示例來(lái)說(shuō)明Apache Flink內(nèi)部實(shí)現(xiàn)Temporal Table JOIN的原理,如下圖所示:
五、Temporal Table JOIN vs 雙流JOIN vs Lateral JOIN
在《Apache Flink 漫談系列(09) - JOIN算子》中我們介紹了雙流JOIN,在《Apache Flink 漫談系列(10) - JOIN LATERAL 》中我們介紹了 JOIN LATERAL(TableFunction),那么本篇介紹的Temporal Table JOIN和雙流JOIN/JOIN LATERAL(TableFunction)有什么本質(zhì)區(qū)別呢?
- 雙流JOIN - 雙流JOIN本質(zhì)很明確是 Stream JOIN Stream,雙流驅(qū)動(dòng)。
- LATERAL JOIN - Lateral JOIN的本質(zhì)是Steam JOIN Table Function, 是單流驅(qū)動(dòng)。
- Temporal Table JOIN - Temporal Table JOIN 的本質(zhì)就是 Stream JOIN Temporal Table 或者 Stream JOIN Table with snapshot。Temporal Table JOIN 特點(diǎn)單流驅(qū)動(dòng),Temporal Table 是被動(dòng)查詢。
1. Temporal Table JOIN vs LATERAL JOIN
從功能上說(shuō)Temporal Table JOIN和 LATERAL JOIN都是由左流一條數(shù)據(jù)獲取多行數(shù)據(jù),也就是單流驅(qū)動(dòng),并且都是被動(dòng)查詢,那么Temporal JOIN和LATERAL JOIN最本質(zhì)的區(qū)別是什么呢?這里我們說(shuō)最關(guān)鍵的一點(diǎn)是 State 的管理,LATERAL JOIN是一個(gè)TableFunction,不具備state的管理能力,數(shù)據(jù)不具備版本特性。而Temporal Table JOIN是一個(gè)具備版本信息的數(shù)據(jù)表。
2. Temporal Table JOIN vs 雙流 JOIN
Temporal Table JOIN 和 雙流 JOIN都可以管理State,那么他們的本質(zhì)區(qū)別是什么? 那就是計(jì)算驅(qū)動(dòng)的差別,Temporal Table JOIN是單邊驅(qū)動(dòng),Temporal Table是被動(dòng)的查詢,而雙流JOIN是雙邊驅(qū)動(dòng),兩邊都是主動(dòng)的進(jìn)行JOIN計(jì)算。
3. Temporal Table JOIN改進(jìn)
個(gè)人認(rèn)為Apache Flink的Temporal Table JOIN功能不論在語(yǔ)法和語(yǔ)義上面都要遵循ANSI-SQL標(biāo)準(zhǔn),后期會(huì)推動(dòng)社區(qū)在Temporal Table上面支持ANSI-SQL的FOR SYSTEM_TIME AS OF標(biāo)準(zhǔn)語(yǔ)法。改進(jìn)后的處理邏輯示意圖:
其中cache是一種性能考慮的優(yōu)化,詳細(xì)內(nèi)容待社區(qū)完善后再細(xì)述。
六、小結(jié)
本篇結(jié)合ANSI-SQL標(biāo)準(zhǔn)和SQL Server對(duì)Temporal Table的支持來(lái)開(kāi)篇,然后介紹目前Apache Flink對(duì)Temporal Table的支持現(xiàn)狀,以代碼示例和內(nèi)部處理邏輯示意圖的方式讓大家直觀體驗(yàn)Temporal Table JOIN的語(yǔ)法和語(yǔ)義。
關(guān)于點(diǎn)贊和評(píng)論
本系列文章難免有很多缺陷和不足,真誠(chéng)希望讀者對(duì)有收獲的篇章給予點(diǎn)贊鼓勵(lì),對(duì)有不足的篇章給予反饋和建議,先行感謝大家!
作者:孫金城,花名 金竹,目前就職于阿里巴巴,自2015年以來(lái)一直投入于基于Apache Flink的阿里巴巴計(jì)算平臺(tái)Blink的設(shè)計(jì)研發(fā)工作。
【本文為51CTO專欄作者“金竹”原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)聯(lián)系原作者】