流批一體的實(shí)時(shí)特征工程平臺(tái)建設(shè)實(shí)踐
本次分享分為四大部分,第一部分總體介紹 FeatHub 在特征開(kāi)發(fā)、部署、監(jiān)控、分享過(guò)程中面臨的場(chǎng)景、目標(biāo)、痛點(diǎn)和挑戰(zhàn);第二部分介紹 FeatHub 的架構(gòu)思路實(shí)踐,及相關(guān)核心概念;第三部分介紹 FeatHub 在使用過(guò)程中的 API 基本使用、基本計(jì)算功能,樣例場(chǎng)景的代碼實(shí)踐,還有性能優(yōu)化,未來(lái)的擴(kuò)展目標(biāo),以及開(kāi)源社區(qū)的共建,提供項(xiàng)目的學(xué)習(xí)、開(kāi)發(fā)使用,還將分享 FeatHub 歷史數(shù)據(jù)的回放功能, 支持離線、近線、在線處理和阿里云上下游組件的支持等問(wèn)題。
一、為什么需要 FeatHub
1、目標(biāo)場(chǎng)景
(1)需要 Python 環(huán)境的數(shù)據(jù)科學(xué)家
今天大部分流行的機(jī)器學(xué)習(xí)的推理和訓(xùn)練程序基本都是由數(shù)據(jù)科學(xué)家用 Python 來(lái)編寫的,比如流行的 TensorFlow、PyTorch 以及一些傳統(tǒng)機(jī)器學(xué)習(xí)場(chǎng)景中用到的 scikit-learn 等等。我們希望支持?jǐn)?shù)據(jù)科學(xué)家繼續(xù)使用熟悉的 Python 編寫特征工程代碼來(lái)完成端到端機(jī)器學(xué)習(xí)鏈路的開(kāi)發(fā)與部署,并且能夠使用他們所熟悉的 Python 生態(tài)環(huán)境中的庫(kù)。
(2)生成實(shí)時(shí)特征
越來(lái)越多的機(jī)器學(xué)習(xí)應(yīng)用在往實(shí)時(shí)方向發(fā)展,通過(guò)實(shí)時(shí)處理可以提高機(jī)器學(xué)習(xí)的效率和準(zhǔn)確度。為了達(dá)到目標(biāo),需要生成實(shí)時(shí)特征。這里不僅僅是去實(shí)時(shí)獲取查詢特征,而是要實(shí)時(shí)生成特征。例如需要實(shí)時(shí)獲取用戶在最近兩分鐘內(nèi)的點(diǎn)擊次數(shù),為此需要使用流式計(jì)算引擎完成實(shí)時(shí)特征計(jì)算。
(3)需要開(kāi)源方案支持多云部署
越來(lái)越多的中小型公司希望做到多云部署,以得到生產(chǎn)的安全保證,以及獲得云廠商之間的競(jìng)價(jià)優(yōu)勢(shì)。因此我們的方案不要求用戶綁定一個(gè)云廠商,而是要讓用戶能夠自由地在不同云廠商之間做選擇,甚至在私有云部署特征工程作業(yè)。
這是 FeatHub 項(xiàng)目設(shè)立之初所希望滿足的一些條件。
2、實(shí)時(shí)特征工程的痛點(diǎn)
今天已經(jīng)有很多公司在開(kāi)發(fā)實(shí)時(shí)特征工程作業(yè)。其中存在一些痛點(diǎn),涵蓋了特征的整個(gè)生命周期,包含開(kāi)發(fā)、部署、監(jiān)控、以及之后的分享。
(1)開(kāi)發(fā)難度高
① 特征穿越
開(kāi)發(fā)階段,用的比較多的是實(shí)時(shí)特征框架 Apache Flink,因?yàn)?Flink 已經(jīng)基本上是實(shí)時(shí)流計(jì)算的事實(shí)標(biāo)準(zhǔn),但是用 Flink 或者類似的框架來(lái)開(kāi)發(fā)實(shí)時(shí)特征存在著需要解決特征穿越的難點(diǎn)。很多數(shù)據(jù)科學(xué)家并不了解特征穿越的解決經(jīng)驗(yàn),并且需要比較多的學(xué)習(xí)時(shí)間和成本來(lái)解決這類問(wèn)題,這是開(kāi)發(fā)階段的主要痛點(diǎn)。
(2)部署難度高
① 需要手動(dòng)翻譯
很多公司會(huì)有一個(gè)專門的平臺(tái)團(tuán)隊(duì)把數(shù)據(jù)科學(xué)家寫的單進(jìn)程 Python 作業(yè)翻譯成可分布式執(zhí)行的 Flink 或者 Spark 作業(yè),來(lái)實(shí)現(xiàn)高性能高可用的部署。其翻譯過(guò)程會(huì)增加整個(gè)開(kāi)發(fā)生命周期長(zhǎng)度。并且因?yàn)檫€需要額外的人力去做翻譯工作,增加了開(kāi)發(fā)成本,更進(jìn)一步帶來(lái)了引入 Bug 的可能。另一撥人將數(shù)據(jù)科學(xué)家的作業(yè)翻譯之后的邏輯未必和原先的邏輯保持一致,這樣就帶來(lái)更多的 Debug 工作量。
(3)監(jiān)控難度高
① 特征分布變化
特征工程作業(yè)的整個(gè)質(zhì)量和效率不只是取決于作業(yè)有沒(méi)有 Bug,還依賴于上游的輸入數(shù)據(jù)數(shù)值分布能滿足一些特性,例如能接近于訓(xùn)練時(shí)的數(shù)據(jù)數(shù)值分布。很多作業(yè)的推理效果下降,經(jīng)常是由于上游作業(yè)生產(chǎn)的數(shù)據(jù)分布發(fā)生了變化。這種情況下,需要開(kāi)發(fā)者去追蹤整個(gè)鏈路,一段段去看在哪個(gè)地方的特征數(shù)據(jù)分布發(fā)生了變化,根據(jù)具體情況再去看是否需要重新訓(xùn)練或者解決 Bug。這部分人力工作量過(guò)大也是一個(gè)痛點(diǎn)。
(4)分享難度高
① 開(kāi)發(fā)工作重復(fù)
雖然很多特征計(jì)算作業(yè)的開(kāi)發(fā)團(tuán)隊(duì)和場(chǎng)景不同,但其實(shí)用了類似甚至相同的特征定義。很多公司中沒(méi)有一個(gè)很好的渠道,讓公司內(nèi)不同團(tuán)隊(duì)能查詢和復(fù)用已有特征。這就導(dǎo)致不同團(tuán)隊(duì)經(jīng)常需要做重復(fù)開(kāi)發(fā),甚至對(duì)于相同特征需要重復(fù)跑作業(yè)去生成一些特征。這帶來(lái)了人力和計(jì)算/儲(chǔ)存資源的浪費(fèi),因?yàn)樾枰嗟挠?jì)算、內(nèi)存、存儲(chǔ)空間去生成相同特征。
② point-in-time correct 語(yǔ)義
為了讓大家能夠理解什么叫特征穿越,上圖給出了一個(gè)簡(jiǎn)單例子,來(lái)展現(xiàn)這個(gè)問(wèn)題。圖左上表是用戶的一個(gè)行為特征,表達(dá)了在不同時(shí)間節(jié)點(diǎn),對(duì)于一個(gè)給定 ID 的用戶,在最近兩分鐘內(nèi)的點(diǎn)擊數(shù)。這個(gè)點(diǎn)擊數(shù)可能幫助我們推理用戶是否會(huì)點(diǎn)擊某個(gè)廣告。為了用這些特征去做訓(xùn)練,通常需要將特征拼接到用戶帶有 Label 的一些數(shù)據(jù)集上。圖左下表展現(xiàn)的是一個(gè)用戶實(shí)際有沒(méi)有點(diǎn)擊廣告的一些正樣本和負(fù)樣本的數(shù)據(jù)集,標(biāo)注了在不同的時(shí)間點(diǎn),用戶所產(chǎn)生的正樣本或負(fù)樣本。為了將這兩個(gè)數(shù)據(jù)集中的特征拼接起來(lái),形成訓(xùn)練用的數(shù)據(jù)集,通常需要根據(jù)用戶 ID 作為 key 進(jìn)行特征拼接。如果只是簡(jiǎn)單地進(jìn)行 Table Join,不考慮時(shí)間戳,就可能產(chǎn)生特征穿越問(wèn)題。 例如在 6:03 分時(shí),用戶最近 2 分鐘點(diǎn)擊數(shù)應(yīng)該是 10,但拼接得到的特征值可能是來(lái)自 7:00 分時(shí)的 6。這種特征穿越會(huì)帶來(lái)實(shí)際推理效果的下降。一個(gè)具有 point-in-time correct 語(yǔ)義的 Join 結(jié)果應(yīng)該如下圖所示:
為了在樣本拼接時(shí)避免特征穿越,對(duì)于在上圖左表中的每一條數(shù)據(jù),應(yīng)該在維表的多個(gè)版本特征當(dāng)中找到時(shí)間戳小于并且最接近于左表中的時(shí)間戳的特征數(shù)值,并將其拼接到最終生成的訓(xùn)練數(shù)據(jù)集上。這樣一個(gè)具有 point-in-time correct 語(yǔ)義的拼接,將產(chǎn)生上圖右邊所顯示的訓(xùn)練數(shù)據(jù)集。針對(duì)不同的時(shí)間點(diǎn),都有所對(duì)應(yīng)最近兩分鐘內(nèi)產(chǎn)生的特征值。這樣生成的訓(xùn)練數(shù)據(jù)集可以提高訓(xùn)練和推理的效果。
3、Feature Store 的核心場(chǎng)景
接下來(lái)介紹 FeatHub 作為一個(gè) Feature Store,對(duì)于整個(gè)特征開(kāi)發(fā)周期的每一階段試圖解決的問(wèn)題和提供的工具。
(1)特征開(kāi)發(fā)
在特征開(kāi)發(fā)階段,F(xiàn)eatHub 會(huì)提供一個(gè)基于 Python 的具有高易用性的 SDK,讓用戶能簡(jiǎn)潔地表達(dá)特征的計(jì)算邏輯。特征計(jì)算本質(zhì)是一個(gè)特征的 ETL。開(kāi)發(fā)階段最重要的是 SDK 的易用性和簡(jiǎn)潔性。
(2)特征部署
在特征部署階段,F(xiàn)eatHub 會(huì)提供執(zhí)行引擎,實(shí)現(xiàn)高性能,低延遲的特征計(jì)算邏輯的部署,并且能對(duì)接不同的特征存儲(chǔ)。部署階段最重要的是執(zhí)行引擎的性能和對(duì)接不同特征存儲(chǔ)的能力。
(3)特征告警
在特征監(jiān)控階段,為了方便開(kāi)發(fā)者及時(shí)發(fā)現(xiàn)特征數(shù)值分布的變化并做出應(yīng)對(duì),F(xiàn)eatHub 將來(lái)會(huì)產(chǎn)生一些常用指標(biāo)來(lái)覆蓋常見(jiàn)的特征質(zhì)量問(wèn)題,例如具有非法數(shù)值的特征比例,或者特征平均值,并根據(jù)這些指標(biāo)進(jìn)行報(bào)警,去及時(shí)通知負(fù)責(zé)人調(diào)查相關(guān)特征分布變化的原因和做出應(yīng)對(duì),來(lái)維護(hù)端到端的推薦鏈路的效果。
(4)特征分享
在特征分享階段,F(xiàn)eatHub 將來(lái)會(huì)提供特征的注冊(cè)和搜索能力,支持同一公司內(nèi)不同團(tuán)隊(duì)的開(kāi)發(fā)人員去查詢自己想要的特征是不是已經(jīng)存在,并復(fù)用這些特征定義和已經(jīng)產(chǎn)生的特征數(shù)據(jù)。
上圖中說(shuō)明 FeatHub 的核心特點(diǎn)。在開(kāi)發(fā)階段,F(xiàn)eatHub 能提供簡(jiǎn)單易用的 SDK,支持具有 point-in-time correct 語(yǔ)義的特征拼接,特征聚合等邏輯。在部署階段,F(xiàn)eatHub 能支持高吞吐、低延遲的特征生成,支持使用 Flink 作為執(zhí)行引擎來(lái)計(jì)算特征;并且能支持多種特征存儲(chǔ)系統(tǒng),方便用戶自由選擇所希望使用的存儲(chǔ)類型。在監(jiān)控階段, FeatHub 將能提供實(shí)時(shí)指標(biāo)來(lái)監(jiān)控特征分布的變化,包含離線和實(shí)時(shí)監(jiān)控,方便開(kāi)發(fā)者及時(shí)發(fā)現(xiàn)問(wèn)題。在分享階段,F(xiàn)eatHub 將會(huì)提供簡(jiǎn)單易用的 Web UI 以及 SDK,支持開(kāi)發(fā)者注冊(cè),搜索和復(fù)用特征。
在 Feature Store 領(lǐng)域內(nèi)已經(jīng)有一些具有代表性的 Feature Store 項(xiàng)目,例如今年初 LinkedIn 開(kāi)源的 Feathr,以及開(kāi)源了多年的 Feast。我們調(diào)研了這些項(xiàng)目,發(fā)現(xiàn)他們并不能很好地達(dá)成我們提出的目標(biāo)場(chǎng)景。
FeatHub 相比現(xiàn)有方案,帶來(lái)的額外價(jià)值包括:
① 簡(jiǎn)單易用的 Python SDK。FeatHub 的 SDK 參考了已有的 Feature Store 項(xiàng)目的 SDK,能支持這些項(xiàng)目的核心功能,并進(jìn)一步提升了 SDK 的抽象能力和易用性,
② 支持單機(jī)上的開(kāi)發(fā)和實(shí)驗(yàn)。開(kāi)發(fā)者不需要對(duì)接分布式的 Flink 或 Spark 集群來(lái)跑實(shí)驗(yàn),而只需要使用單機(jī)上的 CPU 或者內(nèi)存資源就可以進(jìn)行開(kāi)發(fā)和實(shí)驗(yàn),并能使用 scikit-learn 等單機(jī)上的機(jī)器學(xué)習(xí)算法庫(kù)。
③ 無(wú)需修改代碼即可切換執(zhí)行引擎。當(dāng)用戶完成單機(jī)上的開(kāi)發(fā)后,可以將單機(jī)執(zhí)行引擎切換到 Flink 或 Spark 等分布式執(zhí)行引擎,而無(wú)需修改表達(dá)特征計(jì)算邏輯的代碼。使用 Flink 作為執(zhí)行引擎可以讓 Feathub 支持高吞吐、低延時(shí)的實(shí)時(shí)特征計(jì)算。FeatHub 將來(lái)會(huì)進(jìn)一步支持使用 Spark 作為執(zhí)行引擎,讓用戶在離線場(chǎng)景中可以得到潛在的更好的吞吐性能,根據(jù)場(chǎng)景自由選擇最合適的執(zhí)行引擎。
④ 提供執(zhí)行引擎的擴(kuò)展能力。FeatHub 不僅可以支持以 Flink、Spark 作為執(zhí)行引擎,還支持開(kāi)發(fā)者自定義執(zhí)行引擎,使用公司內(nèi)部自研的執(zhí)行引擎進(jìn)行特征 ETL。
⑤ 代碼開(kāi)源,使得用戶可以自由選擇部署 FeatHub 的云廠商,也可以在私有云中進(jìn)行部署。
二、FeatHub 架構(gòu)與核心概念
1、架構(gòu)
以上是包含 FeatHub 主要模塊的架構(gòu)圖。最上層提供了一套 Python SDK,支持用戶定義數(shù)據(jù)源、數(shù)據(jù)終點(diǎn)以及特征計(jì)算邏輯。由 SDK 所定義的特征可以注冊(cè)到特征元數(shù)據(jù)中心,支持其他用戶和作業(yè)來(lái)查詢和復(fù)用特征,甚至可以基于特征元數(shù)據(jù)進(jìn)一步分析特征血緣。特征定義包含了特征的 source、sink,以及常見(jiàn)的計(jì)算邏輯,例如 UDF 調(diào)用、特征拼接,基于 over 窗口與滑動(dòng)窗口的聚合等。當(dāng)需要取生成用戶所定義的特征時(shí),F(xiàn)eatHub 會(huì)提供一些內(nèi)置的 Feature Processor,也就是執(zhí)行引擎,去執(zhí)行已有特征的計(jì)算邏輯。當(dāng)用戶需要在單機(jī)上做實(shí)驗(yàn)時(shí),可以使用 Local Processor 使用單機(jī)上的資源,無(wú)需對(duì)接一個(gè)遠(yuǎn)程的集群。當(dāng)需要生成實(shí)時(shí)特征時(shí),可以使用 Flink Processor 完成高吞吐、低延時(shí)的流式特征計(jì)算。
將來(lái)也可以支持類似于 Lambda Function 的 Feature Service 來(lái)實(shí)現(xiàn)在線的特征計(jì)算,以及對(duì)接 Spark 來(lái)完成高吞吐的離線特征計(jì)算。執(zhí)行引擎可以對(duì)接不同的離線和在線特征儲(chǔ)存系統(tǒng),例如用 Redis 完成在線特征儲(chǔ)存,用 HDFS 完成離線特征儲(chǔ)存,以及用 Kafka 完成近線特征儲(chǔ)存。
上圖展現(xiàn)了 FeatHub 如何被用戶使用,以及對(duì)接下游的機(jī)器學(xué)習(xí)訓(xùn)練和推理程序,用戶或開(kāi)發(fā)者將通過(guò) SDK 來(lái)表達(dá)所希望計(jì)算的特征,然后提交到執(zhí)行引擎上進(jìn)行部署。特征經(jīng)過(guò)計(jì)算后,需要輸出到特征儲(chǔ)存,例如 Redis 和 HDFS。一個(gè)機(jī)器學(xué)習(xí)離線訓(xùn)練程序可以直接讀取 HDFS 中的數(shù)據(jù)去做批量訓(xùn)練。一個(gè)在線的機(jī)器學(xué)習(xí)推理程序可以直接讀取 Redis 中的數(shù)據(jù)進(jìn)行在線推理。
2、核心概念
上圖展現(xiàn)了 FeatHub 中的核心概念之間的關(guān)系。一個(gè) TableDescriptor 表達(dá)一組特征的集合。TableDescriptor 經(jīng)過(guò)邏輯轉(zhuǎn)換可以生產(chǎn)一個(gè)新的 TableDescriptor。
TableDescriptor 分為兩類。其中 FeatureTable 表達(dá)的是具有特定物理地址的表,例如可以是一個(gè)在 Redis 中的表,也可以是一個(gè)在 HDFS 中的表。FeatureView 則是一些不一定有物理地址的邏輯表,通常是從一個(gè) FeatureTable 經(jīng)過(guò)一連邏輯串轉(zhuǎn)換后得到的。
FeatureView 有如下 3 個(gè)子類:
① DerivedFeatureView 輸出的特征表和其輸入的特征表(i.e. source)的行基本是一對(duì)一的。它可以支持表達(dá)單行轉(zhuǎn)換邏輯(e.g. 加減乘除),over window 聚合邏輯,以及特征拼接邏輯。它可用于生成訓(xùn)練數(shù)據(jù)。例如在之前所介紹的例子中,需要將訓(xùn)練樣本去拼接來(lái)自不同維表的特征以得到實(shí)際的訓(xùn)練數(shù)據(jù),就可以使用 DerivedFeatureView 來(lái)完成。
② SlidingFeatureView 支持表達(dá)由滑動(dòng)窗口計(jì)算得到的特征。它輸出的特征表和其輸入的特征表的行不一定是一對(duì)一的。這是因?yàn)榧词箾](méi)有新的輸入,滑動(dòng)窗口計(jì)算得到的特征數(shù)值會(huì)隨著時(shí)間流逝而變化。SlidingFeatureView 可以用于維護(hù)實(shí)時(shí)生成的特征,并輸出到在線特征存儲(chǔ),例如 Redis,用于在線推理。例如,我們可以用 SlidingFeatureView 去計(jì)算每個(gè)用戶最近兩分鐘內(nèi)點(diǎn)擊某個(gè)網(wǎng)頁(yè)的次數(shù),并將特征數(shù)值實(shí)時(shí)更新到 Redis 中,然后廣告推薦鏈路就可以在線查詢這個(gè)特征的值來(lái)做在線推理。
③ OnDemandFeatureView 可以與 Feature Service 用在一起,支持在線特征計(jì)算。例如在使用高德地圖時(shí),開(kāi)發(fā)者可能會(huì)希望在收到用戶的請(qǐng)求之后,根據(jù)用戶當(dāng)前的物理位置與上一次發(fā)送請(qǐng)求時(shí)的物理位置,計(jì)算出用戶移動(dòng)的速度和方向速度,來(lái)協(xié)助推薦路線的決策。這些特征必須在收到用戶請(qǐng)求的時(shí)候進(jìn)行在線計(jì)算得到。OnDemandFeatureView 可以用于支持這類場(chǎng)景。
Transform 表達(dá)的是特征計(jì)算邏輯。FeatHub 當(dāng)前支持如下 5 種特征計(jì)算邏輯:
① Expression 支持用戶基于一個(gè) DSL 語(yǔ)言表達(dá)單行的特征計(jì)算邏輯。其表達(dá)能力接近SQL 語(yǔ)言中的 select 語(yǔ)句,可以支持加減乘除和內(nèi)置函數(shù)調(diào)用,可以讓熟悉 SQL 的開(kāi)發(fā)者快速上手。
② Join 表達(dá)的是特征拼接邏輯。開(kāi)發(fā)者可以指定維表的名字和需要拼接的特征名字等信息。
③ PythonUDF 支持用戶自定義 Python 函數(shù)來(lái)計(jì)算特征。
④ OverWindow 表達(dá)的是 Over 窗口聚合邏輯。例如在收到一行數(shù)據(jù)時(shí),用戶希望根據(jù)之前的 5 行數(shù)據(jù),進(jìn)行聚合并計(jì)算有多少條數(shù)據(jù)符合某個(gè)規(guī)則。
⑤ SlidingWindow 表達(dá)的是滑動(dòng)窗口聚合邏輯。
從上圖中可以看到,通常一個(gè)特征 ETL 作業(yè)會(huì)從特征源表讀取特征,經(jīng)過(guò)多次特征計(jì)算邏輯產(chǎn)生新的特征,并將生成的特征輸出到特征結(jié)果表。特征源表可以對(duì)接不同的特征存儲(chǔ),例如有 FileSystem,Kafka,Hive 等。類似的,特征結(jié)果表也可以對(duì)接 FileSystem,Kafka,Redis 等特征儲(chǔ)存。
Processor 包括 LocalProcessor、FlinkProcessor、SparkProcessor,分別可以使用單機(jī)物理資源,分布式的 Flink 集群,以及分布式 Spark 集群,去執(zhí)行用戶所定義的特征計(jì)算邏輯。
三、FeatHub API 展示
1、特征計(jì)算功能
在介紹了 FeatHub 的架構(gòu)和核心概念后,我們將通過(guò)一些樣例程序來(lái)展現(xiàn) FeatHub SDK 的表達(dá)能力以及易用性。對(duì)于特征開(kāi)發(fā) SDK 來(lái)說(shuō),其最核心的能力就是如何表達(dá)新的特征計(jì)算邏輯。FeatHub SDK 支持特征拼接、窗口聚合、內(nèi)置函數(shù)調(diào)用以及自定義 Python 等能力,將來(lái)還可以支持基于 JAVA 或者 C++ 的 UDF 調(diào)用。
上圖展示了一個(gè)特征拼接的代碼片段。在這個(gè)例子中,假設(shè) HDFS 中有原始的正負(fù)樣本數(shù)據(jù),記錄了用戶購(gòu)買商品的行為。我們想進(jìn)一步想獲取用戶在購(gòu)買每個(gè)商品時(shí)的商品價(jià)格。一個(gè) price_updates 表維護(hù)了商品價(jià)格變化的數(shù)據(jù)。每次商品價(jià)格變化時(shí),會(huì)在 price_updates 表中產(chǎn)生一行數(shù)據(jù),包含商品 ID 和最新的商品價(jià)格。我們可以使用 JoinTransform,設(shè)置 table_name=price_updates,feature_name=price,以及 key=item_id,來(lái)表達(dá)相應(yīng)的特征拼接邏輯。這樣 FeatHub 就可以根據(jù)在 price_updates 中,找到具有給定 item_id 的行,并根據(jù)時(shí)間戳,找到最合適的 price 數(shù)值,來(lái)拼接到樣本數(shù)據(jù)表上。
Over 窗口聚合的代碼片段則展示了如何用 OverWindowTransform 來(lái)計(jì)算特征。用戶可以使用 expr=”item_counts * price”,以及 agg_fun=”SUM”,來(lái)根據(jù)購(gòu)買的商品數(shù)量和價(jià)格,計(jì)算出最近時(shí)間窗口中的總消費(fèi)量。其中窗口的時(shí)間長(zhǎng)度為 2 分鐘。group_by_keys=[“user_id”] 則說(shuō)明了我們會(huì)為每個(gè)用戶單獨(dú)計(jì)算出對(duì)應(yīng)的總消費(fèi)量。
?滑動(dòng)窗口聚合與 Over 窗口聚合比較類似,API 上唯一區(qū)別是可以額外指定 step_size。如果 step_size=1 分鐘,則窗口會(huì)在每分鐘進(jìn)行滑動(dòng)并產(chǎn)生新的特征值。
內(nèi)置函數(shù)調(diào)用的代碼片段展示了如何使用 D?SL 語(yǔ)言表達(dá)加減乘除和 UDF 調(diào)用。假設(shè)輸入的數(shù)據(jù)包含出租車接送乘客的時(shí)間戳。我們可以通過(guò)調(diào)用 UNIX_TIMESTAMP 內(nèi)置函數(shù)將接送乘客的時(shí)間戳轉(zhuǎn)換為整數(shù)類型的 epoch time,然后將得到的 epoch time 相減,得到每次旅程的時(shí)間長(zhǎng)度,作為一個(gè)特征用于之后的訓(xùn)練和推理。
在 PythonUDF 調(diào)用的代碼片段中,用戶可以自定義一個(gè) Python 函數(shù),對(duì)輸入的特征進(jìn)行任意的處理,例如產(chǎn)生小寫的字符串。
通過(guò)以上幾個(gè)代碼片段,我們可以看出 FeatHub 的 API 是比較簡(jiǎn)潔易用的。用戶只需要設(shè)置計(jì)算邏輯所必須的參數(shù),而無(wú)需了解處理引擎的細(xì)節(jié)。
2、樣例場(chǎng)景
在以上樣例場(chǎng)景中,用戶有兩個(gè)數(shù)據(jù)源。其 Purchase Events 包含用戶購(gòu)買商品的樣本數(shù)據(jù),可以來(lái)自于 Kafka,也可以來(lái)自于 FileSystem;Item Price Events 包含商品價(jià)格變動(dòng)的數(shù)據(jù)。每次商品價(jià)格變化時(shí),會(huì)在 Item Price Events 中產(chǎn)生一行數(shù)據(jù),包含商品 ID 和最新的商品價(jià)格。我們希望對(duì)于每條用戶購(gòu)買商品的樣本數(shù)據(jù),計(jì)算用戶在該行為發(fā)生時(shí)最近兩分鐘內(nèi)的消費(fèi)總量,作為特征來(lái)協(xié)助推理出用戶會(huì)不會(huì)購(gòu)買某樣商品。為了生成這個(gè)特征,可以使用上圖中所描述的計(jì)算邏輯,先將 Item Price Events 中的 price 特征以 item_id 作為 join_key 拼接到 Purchase Events 上。然后再基于時(shí)間窗口和使用 user_id 作為 group_by _keys 進(jìn)行聚合,來(lái)計(jì)算得到每個(gè)用戶最近兩分鐘內(nèi)的消費(fèi)總量。
3、樣例代碼
以上代碼片段展示了一個(gè)樣例 FeatHub 應(yīng)用所需要完成的步驟。
① 首先用戶需要?jiǎng)?chuàng)建一個(gè) FeatHubClient 并設(shè)置 processor_type。如果是本地實(shí)驗(yàn),可以設(shè)置成 Local,如果是遠(yuǎn)程分布式生產(chǎn)部署,可以設(shè)置成 Flink。
② 用戶需要?jiǎng)?chuàng)建 Source 來(lái)讀取數(shù)據(jù),例如可以使用 FileSystemSource 讀取在離線儲(chǔ)存系統(tǒng)中的數(shù)據(jù),或者使用 KafkaSource 讀取近線儲(chǔ)存系統(tǒng)中的實(shí)時(shí)數(shù)據(jù)。FileSystemSource 中,用戶可以指定例如 data_format,schema、文件的位置等信息。值得注意的是,用戶可以提供 time_stamp_field 和 time_stamp_format,分別表達(dá)數(shù)據(jù)源表中代表時(shí)間的列以及對(duì)應(yīng)的解析格式。FeatHub 將使用這些信息完成做 point-in-time correct 的特征計(jì)算,避免特征穿越的問(wèn)題。
③ 用戶可以創(chuàng)建一個(gè) FeatureView 來(lái)表達(dá)特征拼接和聚合的邏輯。如果要做拼接,用戶可以 item_price_events.price 來(lái)表達(dá)希望拼接的特征。FeatHub 會(huì)找到名字為 item_price_events 的表并從中拿到名字為 price 的特征。用戶還可以使用 OverWindowTransform 來(lái)完成 Over 窗口聚合,定義一個(gè)名為total_payment_last_two_minutes 的特征。其中 window_size=2 分鐘表示對(duì)于兩分鐘內(nèi)的數(shù)據(jù)應(yīng)用指定的表達(dá)式和聚合函數(shù)來(lái)計(jì)算特征。
④ 對(duì)于已經(jīng)定義的 FeatureView,如果用戶想做本地開(kāi)發(fā)和實(shí)驗(yàn),并使用 scikit-learn 算法庫(kù)進(jìn)行單機(jī)上的訓(xùn)練,可以使用 to_pandas() API 來(lái)將數(shù)據(jù)以 Pandas DataFrame 格式獲取到單機(jī)的內(nèi)存中。
⑤ 當(dāng)用戶需要完成特征的生產(chǎn)部署時(shí),可以使用 FileSystemSink 指定用于存放數(shù)據(jù)的離線特征儲(chǔ)存。然后調(diào)用 execute_insert() 將特征輸出到所指定的 Sink 當(dāng)中。
FeatHub 的基本價(jià)值是提供 SDK 來(lái)方便用戶開(kāi)發(fā)特征,并且提供執(zhí)行引擎來(lái)計(jì)算特征。除此之外,F(xiàn)eatHub 還將提供執(zhí)行引擎的性能優(yōu)化,讓用戶在特征部署階段獲得更多的收益。例如對(duì)于基于滑動(dòng)窗口聚合的特征,目前如果使用原生的 Flink API 來(lái)計(jì)算,F(xiàn)link 會(huì)在每個(gè)滑動(dòng)的 step_size 都輸出對(duì)應(yīng)的特征值,無(wú)論特征的數(shù)值是否發(fā)生了變化。對(duì)于 window_size=1 小時(shí),step_size=1 秒這樣的滑動(dòng)窗口,大部分情況下 Flink 可能會(huì)輸出相同的特征數(shù)值。這樣會(huì)浪費(fèi)網(wǎng)絡(luò)流量、下游存儲(chǔ)等資源。FeatHub 中支持用戶配置滑動(dòng)窗口的行為,允許滑動(dòng)窗口只在特征數(shù)值發(fā)生變化的時(shí)候輸出特征,來(lái)優(yōu)化特征計(jì)算作業(yè)的資源使用量。
另外 FeatHub 還將進(jìn)一步優(yōu)化滑動(dòng)窗口的內(nèi)存和 CPU 使用量。在某些場(chǎng)景中,用戶會(huì)定于許多類似的滑動(dòng)窗口特征。這些特征只有 window size 不一樣。例如我們可能希望得到每個(gè)用戶最近 1 分鐘,5 分鐘,和 10 分鐘內(nèi)的購(gòu)買商品的花費(fèi)總數(shù)。如果使用原生的 Flink API 來(lái)計(jì)算,作業(yè)可能會(huì)使用三個(gè)聚合算子來(lái)分別計(jì)算這 3 個(gè)特征。每個(gè)聚合算子會(huì)有單獨(dú)的內(nèi)存空間??紤]到這些算子所處理的數(shù)據(jù)和計(jì)算邏輯具有較大的重合,F(xiàn)eatHub 可以用一個(gè)自定義算子,統(tǒng)一完成這些特征的計(jì)算,來(lái)達(dá)到節(jié)約內(nèi)存和 CPU 資源的目標(biāo)。
FeatHub 目前已經(jīng)在 GitHub 開(kāi)源,能夠支持一些基本的 LocalProcessor 和 FlinkProcessor 的功能。我們會(huì)進(jìn)一步完善 FeatHub 的核心功能來(lái)方便用戶特征工程的開(kāi)發(fā)和落地。其中包括支持更多常用的離線儲(chǔ)存、在線存儲(chǔ),對(duì)接 Notebook,提供 Web UI 來(lái)可視化特征的元數(shù)據(jù),支持用戶做特征的注冊(cè)、搜索、復(fù)用,以及支持使用 Spark 作為 FeatHub 的執(zhí)行引擎。
FeatHub 代碼庫(kù):?https://github.com/alibaba/FeatHub?
FeatHub 代碼樣例:?https://github.com/flink-extended/FeatHub-examples?
FeatHub 代碼庫(kù)目前放在 github/alibaba 目錄下。為了方便大家學(xué)習(xí)使用 FeatHub,并快速找到和參照滿足所需場(chǎng)景需求的代碼片段,我們?cè)?flink-extended/feathub-examples 代碼庫(kù)中提供額外代碼示例,大家可以自由使用嘗試。歡迎大家提供反饋,以及貢獻(xiàn) PR。
四、問(wèn)答環(huán)節(jié)
Q1:在 point_in_time join 時(shí),特征穿越是由數(shù)據(jù)亂序延遲還是由人工寫 Join 時(shí)導(dǎo)致的?
A1:原則上都有,即使數(shù)據(jù)沒(méi)有亂序,如果在 Join 時(shí)沒(méi)有考慮到 timestamp 字段,就可能導(dǎo)致亂序。在實(shí)際場(chǎng)景中,源數(shù)據(jù)可能也會(huì)亂序。這時(shí)候可以使用類似于 Flink 中的 watermark 策略來(lái)等待晚到的數(shù)據(jù),降低亂序的影響。另外我們可以用定期的離線作業(yè)來(lái) backfill 在線特征數(shù)據(jù),從而進(jìn)一步降低數(shù)據(jù)亂序的影響。
Q2:FeatHub 上線后,如何去產(chǎn)出過(guò)去的訓(xùn)練數(shù)據(jù)及其對(duì)應(yīng)的特征?FeatHub 是否支持歷史數(shù)據(jù)的回放?
A2:FeatHub API 是能支持回放的, 但目前這部分功能還沒(méi)有經(jīng)過(guò)生產(chǎn)驗(yàn)證。FeatHub 將支持使用 Flink 和 Spark 作為執(zhí)行引擎,因此可以復(fù)用 Flink 和 Spark 的計(jì)算能力來(lái)完成歷史數(shù)據(jù)的回放。例如, 我們可以啟動(dòng)一個(gè) Spark 作業(yè),設(shè)置 Source 來(lái)處理過(guò)去一個(gè)月內(nèi)所有的 HDFS 上的數(shù)據(jù),并執(zhí)行所定義的特征拼接和聚合邏輯,然后將計(jì)算得到的特征輸出。
Q3. FeatHub 只負(fù)責(zé)離線特征計(jì)算,如何處理在線部分特征?
A3:特征計(jì)算分為離線、近線和在線,F(xiàn)link 是一個(gè)近線執(zhí)行引擎,可以實(shí)時(shí)計(jì)算例如最近 5 分鐘內(nèi)的用戶點(diǎn)擊次數(shù)這樣的特征,同時(shí)也可以支持離線計(jì)算。因此 FeatHub 可以支持離線和近線特征計(jì)算。FeatHub 將來(lái)有計(jì)劃去支持在線特征計(jì)算,使用基于 Feature Service 的架構(gòu),來(lái)計(jì)算 OnDemandFeatureView 所表達(dá)的特征。
Q4:FeatHub 在阿里云中提供服務(wù),目前有哪些上下游的生態(tài)支持,比如 ODPS 等?
A4:FeatHub 將會(huì)支持所有 Flink 所支持的 Source/Sink,包括 ODPS,Holo 等阿里云提供的服務(wù)。目前 FeatHub 只支持 Kafka 和 FileSystem。我們會(huì)逐步添加更多的儲(chǔ)存支持。