成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Apache Flink在bilibili的多元化探索與實(shí)踐

大數(shù)據(jù)
本文由 bilibili 大數(shù)據(jù)實(shí)時(shí)平臺(tái)負(fù)責(zé)人鄭志升分享,本次分享核心講解萬億級(jí)傳輸分發(fā)架構(gòu)的落地,以及 AI 領(lǐng)域如何基于 Flink 打造一套完善的預(yù)處理實(shí)時(shí) Pipeline。本次分享主要圍繞以下四個(gè)方面:一、B 站實(shí)時(shí)的前世與今生二、Flink On Yarn 的增量化管道的方案三、Flink 和 AI 方向的一些工程實(shí)踐四、未來的發(fā)展與思考

本文由 bilibili 大數(shù)據(jù)實(shí)時(shí)平臺(tái)負(fù)責(zé)人鄭志升分享,本次分享核心講解萬億級(jí)傳輸分發(fā)架構(gòu)的落地,以及 AI 領(lǐng)域如何基于 Flink 打造一套完善的預(yù)處理實(shí)時(shí) Pipeline。

本次分享主要圍繞以下四個(gè)方面:

一、B 站實(shí)時(shí)的前世與今生

二、Flink On Yarn 的增量化管道的方案

三、Flink 和 AI 方向的一些工程實(shí)踐

四、未來的發(fā)展與思考

一、B 站實(shí)時(shí)的前世與今生

1. 生態(tài)場景輻射

說起實(shí)時(shí)計(jì)算的未來,關(guān)鍵詞就在于數(shù)據(jù)的實(shí)效性。首先從整個(gè)大數(shù)據(jù)發(fā)展的生態(tài)上,來看它的核心場景輻射:在大數(shù)據(jù)發(fā)展的初期,核心是以面向天為粒度的離線計(jì)算的場景。 那時(shí)候的數(shù)據(jù)實(shí)效性多數(shù)都是以運(yùn)算以天為單位,它更加注重時(shí)間和成本的平衡。

隨著數(shù)據(jù)應(yīng)用,數(shù)據(jù)分析以及數(shù)據(jù)倉庫的普及與完善,越來越多的人對(duì)數(shù)據(jù)的實(shí)效性提出了更高的要求。比如,當(dāng)需要做一些數(shù)據(jù)的實(shí)時(shí)推薦時(shí),數(shù)據(jù)的實(shí)效將決定它的價(jià)值。在這種情況下,整個(gè)實(shí)時(shí)計(jì)算的場景就普遍誕生。

但在實(shí)際的運(yùn)作過程當(dāng)中,也遇到了很多場景 ,其實(shí)并沒有對(duì)數(shù)據(jù)有非常高的實(shí)時(shí)性要求,在這種情況下必然會(huì)存在數(shù)據(jù)從毫秒,秒或者天的新的一些場景,實(shí)時(shí)場景數(shù)據(jù)更多是以分鐘為粒度的一些增量計(jì)算的場景。對(duì)于離線計(jì)算,它更加注重成本;對(duì)實(shí)時(shí)計(jì)算,它更加注重價(jià)值實(shí)效;而對(duì)于增量計(jì)算,它更加注重去平衡成本,以及綜合的價(jià)值和時(shí)間。

2. B 站的時(shí)效性

在三個(gè)維度上,B 站的劃分是怎樣的?對(duì)于 B 站而言 ,目前有 75% 的數(shù)據(jù)是通過離線計(jì)算來進(jìn)行支撐的,另外還有 20% 的場景是通過實(shí)時(shí)計(jì)算, 5% 是通過增量計(jì)算。

對(duì)于實(shí)時(shí)計(jì)算的場景, 主要是應(yīng)用在整個(gè)實(shí)時(shí)的機(jī)器學(xué)習(xí)、實(shí)時(shí)推薦、廣告搜索、數(shù)據(jù)應(yīng)用、實(shí)時(shí)渠道分析投放、報(bào)表、olap、監(jiān)控等;對(duì)于離線計(jì)算,數(shù)據(jù)輻射面廣,主要以數(shù)倉為主;對(duì)于增量計(jì)算,今年才啟動(dòng)一些新的場景,比如說 binlog 的增量 Upsert 場景。

3. ETL 時(shí)效性差

對(duì)于實(shí)效性問題 ,其實(shí)早期遇到了很多痛點(diǎn) ,核心集中在三個(gè)方面:

第一,傳輸管道缺乏計(jì)算能力。早期的方案,數(shù)據(jù)基本都是要按天落到 ODS ,DW 層是凌晨過后的第二天去掃描前一天所有 ODS 層的數(shù)據(jù),也就是說,整體數(shù)據(jù)沒辦法前置清洗;第二,含有大量作業(yè)的資源集中爆發(fā)在凌晨之后,整個(gè)資源編排的壓力就會(huì)非常大;第三、實(shí)時(shí)和離線的 gap 是比較難滿足的,因?yàn)閷?duì)于大部分的數(shù)據(jù)來說,純實(shí)時(shí)的成本過高,純離線的實(shí)效又太差。同時(shí),MySQL 數(shù)據(jù)的入倉時(shí)效也不太夠。舉個(gè)例子,好比 B 站的彈幕數(shù)據(jù) ,它的體量非常夸張,這種業(yè)務(wù)表的同步往往需要十幾個(gè)小時(shí),而且非常的不穩(wěn)定。

4. AI 實(shí)時(shí)工程復(fù)雜

除了實(shí)效性的問題 早期還遇到了 AI 實(shí)時(shí)工程比較復(fù)雜的問題:

第一,是整個(gè)特征工程計(jì)算效率的問題。同樣的實(shí)時(shí)特征的計(jì)算場景, 也需要在離線的場景上進(jìn)行數(shù)據(jù)的回溯,計(jì)算邏輯就會(huì)重復(fù)開發(fā);第二,整個(gè)實(shí)時(shí)鏈路比較長。一個(gè)完整的實(shí)時(shí)推薦鏈路, 涵蓋了 N 個(gè)實(shí)時(shí)和 M 個(gè)離線的十幾個(gè)作業(yè)組成,有時(shí)候遇到問題排查,整個(gè)鏈路的運(yùn)維和管控成本都非常高;第三、隨著 AI 人員的增多,算法人員的投入,實(shí)驗(yàn)迭代很難橫向擴(kuò)展。

5. Flink 做了生態(tài)化的實(shí)踐

在這些關(guān)鍵痛點(diǎn)的背景下,我們集中針對(duì) Flink 做了生態(tài)化的實(shí)踐,核心包括了整個(gè)實(shí)時(shí)數(shù)倉的應(yīng)用以及整個(gè)增量化的 ETL 管道,還有面向 AI 的機(jī)器學(xué)習(xí)的一些場景。本次的分享會(huì)更加側(cè)重增量管道以及 AI 加 Flink 的方向上。下圖展示了整體的規(guī)模,目前,整個(gè)傳輸和計(jì)算的體量,在萬億級(jí)的消息規(guī)模有 30000+ 計(jì)算核數(shù),1000+ job 數(shù)以及 100 多個(gè)用戶。

二、Flink On Yarn 的增量化管道的方案

1. 早期的架構(gòu)

先來看一下整個(gè)管道早期的架構(gòu),從下圖可以看出,數(shù)據(jù)其實(shí)主要是通過 Flume 來消費(fèi) Kafka 落到 HDFS。Flume 用它的事務(wù)機(jī)制,來確保數(shù)據(jù)從 Source 到 Channel, 再到 Sink 時(shí)候的一致性,最后數(shù)據(jù)落到 HDFS 之后,下游的 Scheduler 會(huì)通過掃描目錄下有沒有 tmp 文件,來判斷數(shù)據(jù)是否 Ready,以此來調(diào)度拉起下游的 ETL 離線作業(yè)。

2. 痛點(diǎn)

在早期遇到了不少痛點(diǎn):

第一個(gè)比較關(guān)鍵的是數(shù)據(jù)質(zhì)量。最先用的是 MemoryChannel,它會(huì)存在數(shù)據(jù)的丟失,之后也試過用 FileChannel 的模式,但性能上無法達(dá)到要求。此外在 HDFS 不太穩(wěn)定的情況下,F(xiàn)lume 的事務(wù)機(jī)制就會(huì)導(dǎo)致數(shù)據(jù)會(huì) rollback 回滾到 Channel,一定程度上會(huì)導(dǎo)致數(shù)據(jù)不斷的重復(fù)。在 HDFS 極度不穩(wěn)定的情況下,最高的重復(fù)率會(huì)達(dá)到百分位的概率;Lzo 行存儲(chǔ),早期的整個(gè)傳輸是通過分隔符的形式,這種分隔符的 Schema 是比較弱約束的,而且也不支持嵌套的格式。第二點(diǎn)是整個(gè)數(shù)據(jù)的時(shí)效,無法提供分鐘級(jí)的查詢,因?yàn)?Flume 不像 Flink 有 Checkpoint 斬?cái)嗟臋C(jī)制,更多是通過 idle 機(jī)制來控制文件的關(guān)閉;第三點(diǎn)是下游的 ETL 聯(lián)動(dòng)。前文有提到,我們更多是通過掃描 tmp 目錄是否 ready 的方案,這種情況下 scheduler 會(huì)大量的和 NameNode 調(diào)用 hadoop list 的 api,這樣會(huì)導(dǎo)致 NameNode 的壓力比較大。

3. 穩(wěn)定性相關(guān)的痛點(diǎn)

在穩(wěn)定性上也遇到很多問題:

第一,F(xiàn)lume 是不帶狀態(tài)的,節(jié)點(diǎn)異常或者是重啟之后,tmp 沒法正常關(guān)閉;第二,早期沒有依附大數(shù)據(jù)的環(huán)境,是物理部署的模式,資源伸縮很難去把控,成本也會(huì)相對(duì)偏高;第三,F(xiàn)lume 和 HDFS 在通信上有問題。比如說當(dāng)寫 HDFS 出現(xiàn)堵塞的情況,某一個(gè)節(jié)點(diǎn)的堵塞會(huì)反壓到 Channel,就會(huì)導(dǎo)致 Source 不會(huì)去 Kafka 消費(fèi)數(shù)據(jù),停止拉動(dòng) offset,一定程度上就會(huì)引發(fā) Kafka 的 Rebalance,最后會(huì)導(dǎo)致全局 offset 不往前推進(jìn),從而導(dǎo)致數(shù)據(jù)的堆積。

4. 萬億級(jí)的增量管道 DAG 視圖

在如上的痛點(diǎn)下,核心方案基于 Flink 構(gòu)建了一套萬億級(jí)的增量管道,下圖是整個(gè)運(yùn)行時(shí)的 DAG 視圖。

首先,在 Flink 架構(gòu)下,KafkaSource 杜絕了 rebalance 的雪崩問題,即便整個(gè) DAG 視圖中有某個(gè)并發(fā)度出現(xiàn)數(shù)據(jù)寫 HDFS 的堵塞,也不會(huì)導(dǎo)致全局所有 Kafka 分區(qū)的堵塞。此外的話,整個(gè)方案本質(zhì)是通過 Transform 的模塊來實(shí)現(xiàn)可擴(kuò)展的節(jié)點(diǎn)。

第一層節(jié)點(diǎn)是 Parser,它主要是做數(shù)據(jù)的解壓反序列化等的解析操作;第二層是引入提供給用戶的定制化 ETL 模塊,它可以實(shí)現(xiàn)數(shù)據(jù)在管道中的定制清洗;第三層是 Exporter 模塊,它支持將數(shù)據(jù)導(dǎo)出到不同的存儲(chǔ)介質(zhì)。比如寫到 HDFS 時(shí),會(huì)導(dǎo)出成 parquet;寫到 Kafka,會(huì)導(dǎo)出成 pb 格式。同時(shí),在整個(gè) DAG 的鏈路上引入了 ConfigBroadcast 的模塊來解決管道元數(shù)據(jù)實(shí)時(shí)更新、熱加載的問題。此外,在整個(gè)鏈路當(dāng)中,每分鐘會(huì)進(jìn)行一次 checkpoint,針對(duì)增量的實(shí)際數(shù)據(jù)進(jìn)行 Append,這樣就可以提供分鐘級(jí)的查詢。

5. 萬億級(jí)的增量管道整體視圖

Flink On Yarn 的整體架構(gòu),可以看出其實(shí)整個(gè)管道視圖是劃分以 BU 為單位的。每個(gè) Kafka 的 topic,都代表了某一種數(shù)據(jù)終端的分發(fā),F(xiàn)link 作業(yè)就會(huì)專門負(fù)責(zé)各種終端類型的寫入處理。視圖里面還可以看到,針對(duì) blinlog 的數(shù)據(jù),還實(shí)現(xiàn)了整個(gè)管道的組裝,可以由多個(gè)節(jié)點(diǎn)來實(shí)現(xiàn)管道的運(yùn)作。

6. 技術(shù)亮點(diǎn)

接下來來看一下整個(gè)架構(gòu)方案核心的一些技術(shù)亮點(diǎn),前三個(gè)是實(shí)時(shí)功能層面的一些特色,后三個(gè)主要是在一些非功能性層面的一些優(yōu)化。

對(duì)于數(shù)據(jù)模型來說,主要是通過 parquet,利用 Protobuf 到 parquet 的映射來實(shí)現(xiàn)格式收斂;分區(qū)通知主要是因?yàn)橐粭l管道其實(shí)是處理多條流,核心解決的是多條流數(shù)據(jù)的分區(qū) ready 的通知機(jī)制;CDC 管道更多是利用 binlog 和 HUDI 來實(shí)現(xiàn) upsert 問題的解決;小文件主要是在運(yùn)行時(shí)通過 DAG 拓?fù)涞姆绞絹斫鉀Q文件合并的問題;HDFS 通信實(shí)際是在萬億級(jí)規(guī)模下的很多種關(guān)鍵問題的優(yōu)化;最后是分區(qū)容錯(cuò)的一些優(yōu)化。

6.1 數(shù)據(jù)模型

業(yè)務(wù)的開發(fā)主要是通過拼裝字符串,來組裝數(shù)據(jù)的一條條記錄的上報(bào)。后期則是通過了模型的定義和管理,以及它的開發(fā)來組織的,主要是通過在平臺(tái)的入口提供給用戶去錄制每一條流、每個(gè)表,它的 Schema ,Schema 會(huì)將它生成 Protobuf 的文件,用戶可以在平臺(tái)上去下載 Protobuf 對(duì)應(yīng)的 HDFS 模型文件,這樣,client 端的開發(fā)完全就可以通過強(qiáng) Schema 方式從 pb 來進(jìn)行約束。

來看一下運(yùn)行時(shí)的過程,首先 Kafka 的 Source 會(huì)去消費(fèi)實(shí)際上游傳過來的每一條 RawEvent 的記錄,RawEvent 里面會(huì)有 PBEvent 的對(duì)象,PBEvent 其實(shí)是一條條的 Protobuf 的記錄。數(shù)據(jù)從 Source 流到的 Parser 模塊,解析后會(huì)形成 PBEvent,PBEvent 會(huì)將用戶在平臺(tái)錄入的整個(gè) Schema 模型,存儲(chǔ)在 OSS 對(duì)象系統(tǒng)上,Exporter 模塊會(huì)動(dòng)態(tài)去加載模型的變更。然后通過 pb 文件去反射生成的具體事件對(duì)象,事件對(duì)象最后就可以映射落成 parquet 的格式。這里主要做了很多緩存反射的優(yōu)化,使整個(gè) pb 的動(dòng)態(tài)解析性能達(dá)到六倍的提升。最后,我們會(huì)將數(shù)據(jù)會(huì)落地到 HDFS,形成 parquet 的格式。

6.2 分區(qū)通知優(yōu)化

前面提到管道會(huì)處理上百條流,早期 Flume 的架構(gòu),其實(shí)每個(gè) Flume 節(jié)點(diǎn),很難去感應(yīng)它自己處理的進(jìn)度。同時(shí),F(xiàn)lume 也沒辦法做到全局進(jìn)度的處理。但是基于 Flink,就可以通過 Watermark 的機(jī)制來解決。

首先在 Source 會(huì)基于消息當(dāng)中的 Eventime 來生成 Watermark,Watermark 會(huì)經(jīng)過每一層的處理傳遞到 Sink,最后會(huì)通過 Commiter 模塊,以單線程的方式來匯總所有 Watermark 消息的進(jìn)度。當(dāng)它發(fā)現(xiàn)全局 Watermark 已經(jīng)推進(jìn)到下個(gè)小時(shí)的分區(qū)的時(shí)候,它會(huì)下發(fā)一條消息到 Hive MetStore,或者是寫入到 Kafka, 來通知上小時(shí)分區(qū)數(shù)據(jù) ready,從而可以讓下游的調(diào)度可以更快的通過消息驅(qū)動(dòng)的方式來拉起作業(yè)的運(yùn)行。

6.3 CDC管道上的優(yōu)化

下圖右側(cè)其實(shí)是整個(gè) cdc 管道完整的鏈路。要實(shí)現(xiàn) MySQL 數(shù)據(jù)到 Hive 數(shù)據(jù)的完整映射,就需要解決流和批處理的問題。

首先是通過 Datax 將 MySQL 的數(shù)據(jù)全量一次性同步到的 HDFS。緊接著通過 spark 的 job,將數(shù)據(jù)初始化成 HUDI 的初始快照,接著通過 Canal 來實(shí)現(xiàn)將 Mysql 的 binlog 的數(shù)據(jù)拖到的 Kafka 的 topic,然后是通過 Flink 的 Job 將初始化快照的數(shù)據(jù)結(jié)合增量的數(shù)據(jù)進(jìn)行增量更新,最后形成 HUDI 表。

整個(gè)鏈路是要解決數(shù)據(jù)的不丟不重,重點(diǎn)是針對(duì) Canal 寫 Kafka 這塊,開了事務(wù)的機(jī)制,保證數(shù)據(jù)落 Kafka topic 的時(shí)候,可以做到數(shù)據(jù)在傳輸過程當(dāng)中的不丟不重。另外,數(shù)據(jù)在傳輸?shù)纳蠈悠鋵?shí)也有可能出現(xiàn)數(shù)據(jù)的重復(fù)和丟失,這時(shí)候更多是通過全局唯一 id 加毫秒級(jí)的時(shí)間戳。在整個(gè)流式 Job 中,針對(duì)全局 id 來做數(shù)據(jù)的去重,針對(duì)毫秒級(jí)時(shí)間來做數(shù)據(jù)的排序,這樣能保證數(shù)據(jù)能夠有序的更新到的 HUDI。

緊接著通過 Trace 的系統(tǒng)基于 Clickhouse 來做存儲(chǔ),來統(tǒng)計(jì)各個(gè)節(jié)點(diǎn)數(shù)據(jù)的進(jìn)出條數(shù)來做到數(shù)據(jù)的精確對(duì)比。

6.4 穩(wěn)定性 - 小文件的合并

前面提到,改造成 Flink 之后,我們是做了每分鐘的 Checkpoint,文件數(shù)的放大非常嚴(yán)重。主要是在整個(gè) DAG 當(dāng)中去引入 merge 的 operater 來實(shí)現(xiàn)文件的合并,merge 的合并方式主要是基于并發(fā)度橫向合并,一個(gè) writer 會(huì)對(duì)應(yīng)一個(gè) merge。這樣每五分鐘的 Checkpoint,1 小時(shí)的 12 個(gè)文件,都會(huì)進(jìn)行合并。通過種方式的話,可以將文件數(shù)極大的控制在合理的范圍內(nèi)。

6.5 HDFS 通信

實(shí)際運(yùn)作過程當(dāng)中經(jīng)常會(huì)遇到整個(gè)作業(yè)堆積比較嚴(yán)重的問題,實(shí)際分析其實(shí)主是和 HDFS 通信有很大的關(guān)系。

其實(shí) HDFS 通訊,梳理了四個(gè)關(guān)鍵的步驟:初始化 state、Invoke、Snapshot 以及 Notify Checkpoint complete。

核心問題主要發(fā)生在 Invoke 階段,Invoke 會(huì)達(dá)到文件的滾動(dòng)條件,這時(shí)候會(huì)觸發(fā) flush 和 close。close 實(shí)際和 NameNode 通信的時(shí)候,會(huì)經(jīng)常出現(xiàn)堵塞的情況。

Snapshot 階段同樣會(huì)遇到一個(gè)問題,一個(gè)管道上百條流一旦觸發(fā) Snapshot,串行執(zhí)行 flush 和 close 也會(huì)非常的慢。

核心優(yōu)化集中在三個(gè)方面:

第一,減少了文件的斬?cái)啵簿褪?close 的頻次。在 Snapshot 階段,不會(huì)去 close 關(guān)閉文件,而更多的是通過文件續(xù)寫的方式。這樣,在初始化 state 的階段,就需要做文件的 Truncate 來做 Recovery 恢復(fù)。第二,是異步化 close 的改進(jìn),可以說是 close 的動(dòng)作不會(huì)去堵塞整個(gè)總鏈路的處理,針對(duì) Invoke 和 Snapshot 的 close,會(huì)將狀態(tài)管理到 state 當(dāng)中,通過初始化 state 來進(jìn)行文件的恢復(fù)。第三,針對(duì)多條流,Snapshot 還做了并行化的處理,每 5 分鐘的 Checkpoint, 多條流其實(shí)就是多個(gè) bucket,會(huì)通過循環(huán)來進(jìn)行串行的處理,那么通過多線程的方式來改造,就可以減少 Checkpoint timeout 的發(fā)生。

6.6 分區(qū)容錯(cuò)的一些優(yōu)化

實(shí)際在管道多條流的情況下,有些流的數(shù)據(jù)并不是每個(gè)小時(shí)都是連續(xù)的。

這種情況會(huì)帶來分區(qū),它的 Watermark 沒有辦法正常推進(jìn),引發(fā)空分區(qū)的問題。所以我們?cè)诠艿赖倪\(yùn)行過程當(dāng)中,引入 PartitionRecover 模塊,它會(huì)根據(jù) Watermark 來推進(jìn)分區(qū)的通知。針對(duì)有些流的 Watermark,如果在 ideltimeout 還沒有更新的情況下,Recover 模塊來進(jìn)行分區(qū)的追加。它會(huì)在每個(gè)分區(qū)的末尾到達(dá)的時(shí)候,加上 delay time 來掃描所有流的 Watermark,由此來進(jìn)行兜底。

在傳輸過程當(dāng)中,當(dāng) Flink 作業(yè)重啟的時(shí)候,會(huì)遇到一波僵尸的文件,我們是通過在 DAG 的 commit 的節(jié)點(diǎn),去做整個(gè)分區(qū)通知前的僵尸文件的清理刪除,來實(shí)現(xiàn)整個(gè)僵尸文件的清理,這些都屬于非功能性層面的一些優(yōu)化。

三、Flink 和 AI 方向的一些工程實(shí)踐

1. 架構(gòu)演進(jìn)時(shí)間表

下圖是 AI 方向在實(shí)時(shí)架構(gòu)完整的時(shí)間線。

早在 2018 年,很多算法人員的實(shí)驗(yàn)開發(fā)都是作坊式的。每個(gè)算法人員會(huì)根據(jù)自己熟悉的語言,比如說 Python,php 或 c++ 來選擇不同的語言來開發(fā)不同的實(shí)驗(yàn)工程。它的維護(hù)成本非常大,而且容易出現(xiàn)故障;2019 年上半年,主要是基于 Flink 提供了 jar 包的模式來面向整個(gè)算法做一些工程的支持,可以說在整個(gè)上半年的初期,其實(shí)更多是圍繞穩(wěn)定性,通用性來做一些支持;2019 年的下半年,是通過自研的 BSQL,大大降低了模型訓(xùn)練的門檻,解決 label 以及 instance 的實(shí)時(shí)化來提高整個(gè)實(shí)驗(yàn)迭代的效率;2020 年上半年,更多是圍繞整個(gè)特征的計(jì)算,流批計(jì)算打通以及特征工程效率的提升,來做一些改進(jìn);到2020 年的下半年,更多是圍繞整個(gè)實(shí)驗(yàn)的流程化以及引入 AIFlow,方便的去做流批 DAG。

2. AI 工程架構(gòu)回顧

回顧一下整個(gè) AI 工程,它的早期的架構(gòu)圖其實(shí)體現(xiàn)的是整個(gè) AI 在 2019 年初的架構(gòu)視圖,其本質(zhì)是通過一些 single task 的方式,各種混合語言來組成的一些計(jì)算節(jié)點(diǎn),來支撐著整個(gè)模型訓(xùn)練的鏈路拉起。經(jīng)過 2019 年的迭代,將整個(gè)近線的訓(xùn)練完全的替換成用 BSQL 的模式來進(jìn)行開發(fā)和迭代。

3. 現(xiàn)狀痛點(diǎn)

在 2019 年底,其實(shí)又遇到了一些新的問題,這些問題主要集中在功能和非功能兩個(gè)維度上。

在功能層面:首先從 label 轉(zhuǎn)到產(chǎn)生 instance 流,以及到模型訓(xùn)練,到線上預(yù)測,乃至真正的實(shí)驗(yàn)效果,整個(gè)鏈路非常的長且復(fù)雜;第二,整個(gè)實(shí)時(shí)的特征、離線特征、以及流批的一體,涉及到非常多的作業(yè)組成,整個(gè)鏈路很復(fù)雜。同時(shí)實(shí)驗(yàn)和 online 都要做特征的計(jì)算,結(jié)果不一致會(huì)導(dǎo)致最終的效果出現(xiàn)問題。此外,特征存在哪里也不好找,沒辦法去追溯。

在非功能性層面,算法的同學(xué)經(jīng)常會(huì)遇到,不知道 Checkpoint 是什么,要不要開,有啥配置。此外,線上出問題的時(shí)候也不好排查,整個(gè)鏈路都非常的長。所以第三點(diǎn)就是,完整的實(shí)驗(yàn)進(jìn)度需要涉及的資源是非常多的,但是對(duì)算法來說它根本就不知道這些資源是什么以及需要多少,這些問題其實(shí)都都對(duì)算法產(chǎn)生很大的困惑。

4. 痛點(diǎn)歸結(jié)

歸根結(jié)底,集中在三個(gè)方面:

第一是一致性的問題。從數(shù)據(jù)的預(yù)處理,到模型訓(xùn)練,再到預(yù)測,各個(gè)環(huán)節(jié)其實(shí)是斷層的。當(dāng)中包括數(shù)據(jù)的不一致,也包括計(jì)算邏輯的不一致;第二,整個(gè)實(shí)驗(yàn)迭代非常慢。一個(gè)完整的實(shí)驗(yàn)鏈路,其實(shí)對(duì)算法同學(xué)來說,他需要掌握東西非常多。同時(shí)實(shí)驗(yàn)背后的物料沒辦法進(jìn)行共享。比如說有些特征,每個(gè)實(shí)驗(yàn)背后都要重復(fù)開發(fā);第三,是運(yùn)維和管控的成本比較高。

完整的實(shí)驗(yàn)鏈路,背后其實(shí)是包含實(shí)時(shí)的一條工程加離線的一條工程鏈路組成,線上的問題很難去排查。

5. 實(shí)時(shí) AI 工程的雛形

在這樣的一些痛點(diǎn)下,在 20 年主要是集中在 AI 方向上去打造實(shí)時(shí)工程的雛形。核心是通過下面三個(gè)方面來進(jìn)行突破。

第一是在 BSQL 的一些能力上,對(duì)于算法,希望通過面向 SQL 來開發(fā)以此降低工程投入;第二是特征工程,會(huì)通過核心解決特征計(jì)算的一些問題來滿足特征的一些支持;第三是整個(gè)實(shí)驗(yàn)的協(xié)作,算法的目的其實(shí)在于實(shí)驗(yàn),希望去打造一套端到端的實(shí)驗(yàn)協(xié)作,最終希望做到面向算法能夠“一鍵實(shí)驗(yàn)”。

6. 特征工程-難點(diǎn)

我們?cè)谔卣鞴こ讨杏龅搅艘恍╇y點(diǎn)。

第一是在實(shí)時(shí)特征計(jì)算上,因?yàn)樗枰獙⒔Y(jié)果利用到整個(gè)線上的預(yù)測服務(wù),所以它對(duì)延遲以及穩(wěn)定性的要求都非常的高;第二是整個(gè)實(shí)時(shí)和離線的計(jì)算邏輯一致,我們經(jīng)常遇到一個(gè)實(shí)時(shí)特征,它需要去回溯過去 30 天到到 60 天的離線數(shù)據(jù),怎么做到實(shí)時(shí)特征的計(jì)算邏輯能同樣在離線特征的計(jì)算上去復(fù)用;第三是整個(gè)離線特征的流批一體比較難打通。實(shí)時(shí)特征的計(jì)算邏輯經(jīng)常會(huì)帶有窗口時(shí)序等等一些流式的概念,但是離線特征是沒有這些語義的。

7. 實(shí)時(shí)特征

這里看一下我們?cè)趺慈プ鰧?shí)時(shí)特征,圖中的右側(cè)是最典型的一些場景。比如說我要實(shí)時(shí)統(tǒng)計(jì)用戶最近一分鐘、6 小時(shí)、12 小時(shí)、24 小時(shí),對(duì)各個(gè) UP 主相關(guān)視頻的播放次數(shù)。針對(duì)這樣場景,其實(shí)里面有兩個(gè)點(diǎn):

第一、它需要用到滑動(dòng)窗口來做整個(gè)用戶過去歷史的計(jì)算。此外,數(shù)據(jù)在滑動(dòng)計(jì)算過程當(dāng)中,它還需要去關(guān)聯(lián) UP 主的一些基礎(chǔ)的信息維表,來獲取 UP 主的一些視頻來統(tǒng)計(jì)他的播放次數(shù)。歸根結(jié)底,其實(shí)遇到了兩個(gè)比較大的痛。用 Flink 原生的滑動(dòng)窗口,分鐘級(jí)的滑動(dòng),會(huì)導(dǎo)致窗口比較多,性能會(huì)損耗比較大。同時(shí)細(xì)粒度的窗口也會(huì)導(dǎo)致定時(shí)器過多,清理效率比較差。第二是維表查詢,會(huì)遇到是多個(gè) key 要去查詢 HBASE 的多個(gè)對(duì)應(yīng)的 value,這種情況需要去支持?jǐn)?shù)組的并發(fā)查詢。

在兩個(gè)痛點(diǎn)下,針對(duì)滑動(dòng)窗口,主要是改造成為 Group By 的模式,加上 agg 的 UDF 的模式,將整個(gè)一小時(shí)、六小時(shí)、十二小時(shí)、二十四小時(shí)的一些窗口數(shù)據(jù),存放到整個(gè) Rocksdb 當(dāng)中。這樣通過 UDF 模式,整個(gè)數(shù)據(jù)觸發(fā)機(jī)制就可以基于 Group By 實(shí)現(xiàn)記錄級(jí)的觸發(fā),整個(gè)語義、時(shí)效性都會(huì)提升的比較大。同時(shí)在整個(gè) AGG 的 UDF 函數(shù)當(dāng)中,通過 Rocksdb 來做 state,在 UDF 當(dāng)中來維護(hù)數(shù)據(jù)的生命周期。此外還擴(kuò)展了整個(gè) SQL 實(shí)現(xiàn)了數(shù)組級(jí)別的維表查詢。最后的整個(gè)效果其實(shí)可以在實(shí)時(shí)特征的方向上,通過超大窗口的模式來支持各種計(jì)算場景。

8. 特征-離線

接下來看一下離線,左側(cè)視圖上半部分是完整的實(shí)時(shí)特征的計(jì)算鏈路,可以看出要解決同樣的一條 SQL,在離線的計(jì)算上也能夠復(fù)用,那就需要去解決相應(yīng)的一些計(jì)算的 IO 都能夠復(fù)用的問題。比如在流式上是通過 Kafka 來進(jìn)行數(shù)據(jù)的輸入,在離線上需要通過 HDFS 來做數(shù)據(jù)的輸入。在流式上是通過 KFC 或者 AVBase 等等的一些 kv 引擎來支持,在離線上就需要通過 hive 引擎來解決,歸根結(jié)底,其實(shí)需要去解決三個(gè)方面的問題:

第一,需要去模擬整個(gè)流式消費(fèi)的能力,能夠支持在離線的場景下去消費(fèi) HDFS 數(shù)據(jù);第二,需要解決 HDFS 數(shù)據(jù)在消費(fèi)過程當(dāng)中的分區(qū)有序的問題,類似 Kafka 的分區(qū)消費(fèi);第三,需要去模擬 kv 引擎維表化的消費(fèi),實(shí)現(xiàn)基于 hive 的維表消費(fèi)。還需要解決一個(gè)問題,當(dāng)從 HDFS 拉取的每一條記錄,每一條記錄其實(shí)消費(fèi) hive 表的時(shí)候都有對(duì)應(yīng)的 Snapshot,就相當(dāng)于是每一條數(shù)據(jù)的時(shí)間戳,要消費(fèi)對(duì)應(yīng)數(shù)據(jù)時(shí)間戳的分區(qū)。

9. 優(yōu)化

9.1 離線-分區(qū)有序

分區(qū)有序的方案其實(shí)主要是基于數(shù)據(jù)在落 HDFS 時(shí)候,前置做了一些改造。首先數(shù)據(jù)在落 HDFS 之前,是傳輸?shù)墓艿溃ㄟ^ Kafka 消費(fèi)數(shù)據(jù)。在 Flink 的作業(yè)從 Kafka 拉取數(shù)據(jù)之后,通過 Eventtime 去提取數(shù)據(jù)的 watermark,每一個(gè) Kafka Source 的并發(fā)度會(huì)將 watermark 匯報(bào)到 JobManager 當(dāng)中的 GlobalWatermark 模塊,GlobalAgg 會(huì)匯總來自每一個(gè)并發(fā)度 Watermark 推進(jìn)的進(jìn)度,從而去統(tǒng)計(jì) GlobalWatermark 的進(jìn)展。根據(jù) GlobalWatermark 的進(jìn)展來計(jì)算出當(dāng)中有哪些并發(fā)度的 Watermark 計(jì)算過快的問題,從而通過 GlobalAgg 下發(fā)給 Kafka Source 控制信息,Kafka Source 有些并發(fā)度過快的情況下,它的整個(gè)分區(qū)推進(jìn)就降低速度。這樣,在 HDFS Sink 模塊,在同時(shí)間片上收到的數(shù)據(jù)記錄的整個(gè) Event time 基本上有序的,最終落到 HDFS 還會(huì)在文件名上去標(biāo)識(shí)它相應(yīng)的分區(qū)以及相應(yīng)的時(shí)間片范圍。最后在 HDFS 分區(qū)目錄下,就可以實(shí)現(xiàn)數(shù)據(jù)分區(qū)的有序目錄。

9.2 離線-分區(qū)增量消費(fèi)

數(shù)據(jù)在 HDFS 增量有序之后,實(shí)現(xiàn)了 HDFStreamingSource,它會(huì)針對(duì)文件做 Fecher 分區(qū),針對(duì)每個(gè)文件都有 Fecher 的線程,且每個(gè) Fecher 線程會(huì)統(tǒng)計(jì)每一個(gè)文件。它 offset 處理了游標(biāo)的進(jìn)度,會(huì)將狀態(tài)根據(jù) Checkpoint 的過程,將它更新到的 State 當(dāng)中。

這樣就可以實(shí)現(xiàn)整個(gè)文件消費(fèi)的有序推進(jìn)。在回溯歷史數(shù)據(jù)的時(shí)候,離線作業(yè)就會(huì)涉及到整個(gè)作業(yè)的停止。實(shí)際是在整個(gè) FileFetcher 的模塊當(dāng)中去引入一個(gè)分區(qū)結(jié)束的標(biāo)識(shí),且會(huì)在每一個(gè)線程去統(tǒng)計(jì)每一個(gè)分區(qū)的時(shí)候,去感應(yīng)它分區(qū)的結(jié)束,分區(qū)結(jié)束后的狀態(tài)最后匯總到的 cancellationManager,并進(jìn)一步會(huì)匯總到 Job Manager 去更新全局分區(qū)的進(jìn)度,當(dāng)全局所有的分區(qū)都到了末尾的游標(biāo)時(shí)候,會(huì)將整個(gè) Flink 作業(yè)進(jìn)行 cancel 關(guān)閉掉。

9.3 離線 - Snapshot 維表

前面講到整個(gè)離線數(shù)據(jù),其實(shí)數(shù)據(jù)都在 hive 上,hive 的 HDFS 表數(shù)據(jù)的整個(gè)表字段信息會(huì)非常的多,但實(shí)際做離線特征的時(shí)候,需要的信息其實(shí)是很少的,因此需要在 hive 的過程先做離線字段裁剪,將一張 ODS 的表清洗成 DW 的表,DW 的表會(huì)最后通過 Flink 運(yùn)行 Job,內(nèi)部會(huì)有個(gè) reload 的 scheduler,它會(huì)定期的去根據(jù)數(shù)據(jù)當(dāng)前推進(jìn)的 Watermark 的分區(qū),去拉取在 hive 當(dāng)中每一個(gè)分區(qū)對(duì)應(yīng)的表信息。通過去下載某 HDFS 的 hive 目錄當(dāng)中的一些數(shù)據(jù),最后會(huì)在整個(gè)內(nèi)存當(dāng)中 reload 成 Rocksdb 的文件,Rocksdb 其實(shí)就是最后用來提供維表 KV 查詢的組件。

組件里面會(huì)包含多個(gè) Rocksdb 的 build 構(gòu)建過程,主要是取決于整個(gè)數(shù)據(jù)流動(dòng)的過程當(dāng)中的 Eventtime,如果發(fā)現(xiàn) Eventtime 推進(jìn)已經(jīng)快到小時(shí)分區(qū)結(jié)束的末尾時(shí)候,會(huì)通過懶加載的模式去主動(dòng) reload,構(gòu)建下一個(gè)小時(shí) Rocksdb 的分區(qū),通過這種方式,來切換整個(gè) Rocksdb 的讀取。

10. 實(shí)驗(yàn)流批一體

在上面三個(gè)優(yōu)化,也就是分區(qū)有序增量,類 Kafka 分區(qū) Fetch 消費(fèi),以及維表 Snapshot 的基礎(chǔ)下,最終是實(shí)現(xiàn)了實(shí)時(shí)特征和離線特征,共用一套 SQL 的方案,打通了特征的流批計(jì)算。緊接著來看一下整個(gè)實(shí)驗(yàn),完整的流批一體的鏈路,從圖中可以看出最上面的粒度是整個(gè)離線的完整的計(jì)算過程。第二是整個(gè)近線的過程,離線過程其實(shí)所用計(jì)算的語義都是和近線過程用實(shí)時(shí)消費(fèi)的語義是完全一致的,都是用 Flink 來提供 SQL 計(jì)算的。

來看一下近線,其實(shí) Label join 用的是 Kafka 的一條點(diǎn)擊流以及展現(xiàn)流,到了整個(gè)離線的計(jì)算鏈路,則用的一條 HDFS 點(diǎn)擊的目錄和 HDFS 展現(xiàn)目錄。特征數(shù)據(jù)處理也是一樣的,實(shí)時(shí)用的是 Kafka 的播放數(shù)據(jù),以及 Hbase 的一些稿件數(shù)據(jù)。對(duì)于離線來說,用的是 hive 的稿件數(shù)據(jù),以及 hive 的播放數(shù)據(jù)。除了整個(gè)離線和近線的流批打通,還將整個(gè)近線產(chǎn)生的實(shí)時(shí)的數(shù)據(jù)效果匯總到 OLAP 引擎上,通過 superset 來提供整個(gè)實(shí)時(shí)的指標(biāo)可視化。其實(shí)從圖可以看出完整的復(fù)雜流批一體的計(jì)算鏈路,當(dāng)中包含的計(jì)算節(jié)點(diǎn)是非常的復(fù)雜和龐多的。

11. 實(shí)驗(yàn)協(xié)作 - 挑戰(zhàn)

下階段挑戰(zhàn)更多是在實(shí)驗(yàn)協(xié)作上,下圖是將前面整個(gè)鏈路進(jìn)行簡化后的抽象。從圖中可以看出,三個(gè)虛線的區(qū)域框內(nèi),分別是離線的鏈路加兩個(gè)實(shí)時(shí)的鏈路,三個(gè)完整的鏈路構(gòu)成作業(yè)的流批,實(shí)際上就是一個(gè)工作流最基本的過程。里面需要去完成工作流完整的抽象,包括了流批事件的驅(qū)動(dòng)機(jī)制,以及,對(duì)于算法在 AI 領(lǐng)域上更多希望用 Python 來定義完整的 flow,此外還將整個(gè)輸入,輸出以及它的整個(gè)計(jì)算趨于模板化,這樣可以做到方便整個(gè)實(shí)驗(yàn)的克隆。

12. 引入 AIFlow

整個(gè)工作流上在下半年更多是和社區(qū)合作,引入了 AIFlow 的整套方案。

右側(cè)其實(shí)是整個(gè) AIFlow 完整鏈路的DAG視圖,可以看出整個(gè)節(jié)點(diǎn),其實(shí)它支持的類型是沒有任何限制的,可以是流式節(jié)點(diǎn),也可以是離線節(jié)點(diǎn)。此外的話,整個(gè)節(jié)點(diǎn)與節(jié)點(diǎn)之間通信的邊是可以支持?jǐn)?shù)據(jù)驅(qū)動(dòng)以及事件驅(qū)動(dòng)的。引入 AIFlow 的好處主要在于,AIFlow 提供基于 Python 語義來方便去定義完整的 AIFlow 的工作流,同時(shí)還包括整個(gè)工作流的進(jìn)度的調(diào)度。

在節(jié)點(diǎn)的邊上,相比原生的業(yè)界的一些 Flow 方案,他還支持基于事件驅(qū)動(dòng)的整個(gè)機(jī)制。好處是可以幫助在兩個(gè) Flink 作業(yè)之間,通過 Flink 當(dāng)中 watermark 處理數(shù)據(jù)分區(qū)的進(jìn)度去下發(fā)一條事件驅(qū)動(dòng)的消息來拉起下一個(gè)離線或者實(shí)時(shí)的作業(yè)。

此外還支持周邊的一些配套服務(wù),包括通知的一些消息模塊服務(wù),還有元數(shù)據(jù)的服務(wù),以及在 AI 領(lǐng)域一些模型中心的服務(wù)。

13. Python 定義 Flow

來看一下基于 AIFlow 是如何最終定義成 Python 的工作流。右邊的視圖是一個(gè)線上項(xiàng)目的完整工作流的定義。第一、是整個(gè)是 Spark job 的定義,當(dāng)中通過配置 dependence 來描述整個(gè)下游的依賴關(guān)系,它會(huì)下發(fā)一條事件驅(qū)動(dòng)的消息來拉起下面的 Flink 流式作業(yè)。流式作業(yè)也同樣可以通過消息驅(qū)動(dòng)的方式來拉起下面的 Spark 作業(yè)。整個(gè)語義的定義非常的簡單,只需要四個(gè)步驟,配置每節(jié)點(diǎn)的 confg 的信息,以及定義每節(jié)點(diǎn)的 operation 的行為,還有它的 dependency 的依賴,最后去運(yùn)行整個(gè) flow 的拓?fù)湟晥D。

14. 基于事件驅(qū)動(dòng)流批

接下來看一下完整的流批調(diào)度的驅(qū)動(dòng)機(jī)制,下圖右側(cè)是完整的三個(gè)工作節(jié)點(diǎn)的驅(qū)動(dòng)視圖。第一個(gè)是從 Source 到 SQL 到 Sink。引入的黃色方框是擴(kuò)展的 supervisor,他可以收集全局的 watermark 進(jìn)度。當(dāng)整個(gè)流式作業(yè)發(fā)現(xiàn) watermark 可以推進(jìn)到下一個(gè)小時(shí)的分區(qū)的時(shí)候,它會(huì)下發(fā)一條消息,去給到 NotifyService。NotifyService 拿到這條消息之后,它會(huì)去下發(fā)給到下一個(gè)作業(yè),下一個(gè)作業(yè)主要會(huì)在整個(gè) Flink 的 DAG 當(dāng)中去引入 flow 的 operator,operator 在沒有收到上個(gè)作業(yè)下發(fā)了消息之前,它會(huì)堵塞整個(gè)作業(yè)的運(yùn)行。直到收到消息驅(qū)動(dòng)之后,就代表上游其實(shí)上一個(gè)小時(shí)分區(qū)已經(jīng)完成了,這時(shí)下個(gè) flow 節(jié)點(diǎn)就可以驅(qū)動(dòng)拉起來運(yùn)作。同樣,下個(gè)工作流節(jié)點(diǎn)也引入了 GlobalWatermark Collector 的模塊來匯總收集它的處理的進(jìn)度。當(dāng)上一個(gè)小時(shí)分區(qū)完成之后,它也會(huì)下發(fā)一條消息到 NotifyService,NotifyService 會(huì)將這條消息去驅(qū)動(dòng)調(diào)用 AIScheduler 的模塊,從而去拉起 spark 離線作業(yè)來做 spark 離線的收尾。從里你們可以看出,整個(gè)鏈路其實(shí)是支持批到批,批到流以及流到流,以及流到批的四個(gè)場景。

15. 實(shí)時(shí) AI 全鏈路的雛形

在流和批的整個(gè) flow 定義和調(diào)度的基礎(chǔ)上,在 2020 年初步構(gòu)建出來了實(shí)時(shí) AI 全鏈路的雛形,核心是面向?qū)嶒?yàn)。算法同學(xué)也可以基于 SQL 來開發(fā)的 Node 的節(jié)點(diǎn),Python 是可以定義完整的 DAG 工作流。監(jiān)控,告警以及運(yùn)維是一體化的。

同時(shí),支持從離線到實(shí)時(shí)的打通,從數(shù)據(jù)處理到模型訓(xùn)練,從模型訓(xùn)練到實(shí)驗(yàn)效果的打通,以及面向端到端的打通。右側(cè)是整個(gè)近線實(shí)驗(yàn)的鏈路。下面是將整個(gè)實(shí)驗(yàn)鏈路產(chǎn)出的物料數(shù)據(jù)提供給在線的預(yù)測訓(xùn)練的服務(wù)。整體會(huì)有三個(gè)方面的配套:

一是基礎(chǔ)的一些平臺(tái)功能,包括實(shí)驗(yàn)管理,模型管理,特征管理等等;其次也包括整個(gè) AIFlow 底層的一些 service 的服務(wù);再有是一些平臺(tái)級(jí)的 metadata 的元數(shù)據(jù)服務(wù)。

四、未來的一些展望

在未來的一年,我們還會(huì)更加集中在兩個(gè)方面的一些工作。

第一是數(shù)據(jù)湖的方向上,會(huì)集中在 ODS 到 DW 層的一些增量計(jì)算場景,以及 DW 到 ADS 層的一些場景的突破,核心會(huì)結(jié)合 Flink 加 Iceberg 以及 HUDI 來作為該方向的落地。在實(shí)時(shí) AI 平臺(tái)上,會(huì)進(jìn)一步去面向?qū)嶒?yàn)來提供一套實(shí)時(shí)的 AI 協(xié)作平臺(tái),核心是希望打造高效,能夠提煉簡化算法人員的工程平臺(tái)。

 

 

責(zé)任編輯:梁菲 來源: 阿里云云棲號(hào)
相關(guān)推薦

2022-04-07 16:50:28

FlinkB站Kafka

2025-01-15 09:16:10

2024-05-13 10:44:22

云計(jì)算

2019-04-30 09:00:33

SQL數(shù)據(jù)庫Apache Flin

2018-11-14 13:49:16

Apache Flin唯品會(huì)架構(gòu)

2024-10-23 20:09:47

2011-05-05 14:52:10

無縫拼接拼接大屏幕

2014-01-15 16:46:07

多元化

2015-05-28 17:34:50

順豐田民IT驅(qū)動(dòng)

2022-04-24 11:27:05

邊緣計(jì)算數(shù)據(jù)自動(dòng)駕駛

2015-12-14 17:36:16

5G無線網(wǎng)絡(luò)

2010-05-13 23:34:39

統(tǒng)一通信環(huán)境

2021-03-17 07:59:36

邊緣計(jì)算遠(yuǎn)程辦公數(shù)字化轉(zhuǎn)型

2020-11-04 10:09:06

物聯(lián)網(wǎng)智慧辦公技術(shù)

2014-10-09 16:52:37

BQ企業(yè)即時(shí)通溝通

2017-12-22 17:43:58

司法大數(shù)據(jù)大數(shù)據(jù)法院

2017-05-18 11:43:41

Android模塊化軟件

2011-06-01 16:35:00

2022-09-16 08:23:22

Flink數(shù)據(jù)湖優(yōu)化

2021-08-06 15:06:09

騰訊開源Apache
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 一区二区三区电影网 | 黄色免费观看网站 | 中文在线一区二区 | 免费在线精品视频 | 在线欧美一区 | 亚洲二区在线 | 亚洲www啪成人一区二区麻豆 | 亚洲欧美成人 | 精品国产乱码久久久久久中文 | 欧美在线视频一区二区 | 日韩福利片| 91视频网址 | 日日综合 | 韩国精品在线 | 欧美乱操 | 成人在线欧美 | 99视频在线| 国产精品视频免费观看 | 黄色大片在线播放 | 久久国产亚洲精品 | 91精品福利| 波霸ol一区二区 | 天天干狠狠操 | 久久大陆 | 日韩欧美国产精品一区二区三区 | 无人区国产成人久久三区 | 欧美日韩精品久久久免费观看 | 一二三四在线视频观看社区 | 成人免费在线观看 | 久久久久久久av | 香蕉一区二区 | 国产在线一区观看 | 成人三区 | 国产福利在线视频 | 亚洲一区二区三区在线播放 | 久久久久91 | 国产精品福利在线 | www.婷婷 | 久久久久国产精品 | 日韩国产一区二区 | 91免费入口 |