五分鐘技術趣談 | Apache Paimon淺析及在威脅情報中的應用
Part 01
Apache Paimon是什么?
Apache Paimon是一種多功能的流數據湖平臺,支持高速數據攝取、變更數據跟蹤和實時分析,它為讀/寫操作提供靈活的架構,并與各種計算引擎(如Apache Flink、Apache Hive、Apache Spark和Trino)集成。Paimon利用列式文件存儲和LSM樹結構進行高效的數據更新和查詢。它提供連接器,用于消息隊列、OLAP系統和批量存儲的統一存儲。Paimon的表抽象可以無縫的批處理和流處理執行模式,用于數據處理。
圖1 Apache Paimon架構圖
Part 02
Apache Paimon的特點
Apache Paimon作為一個數據湖平臺,目前具有以下幾個主要特點:(1)大規模實時更新;(2)數據表局部更新;(3)流批一體讀寫。
- 大規模實時更新
Paimon 是一種新穎的數據存儲系統,它結合了湖存儲、LSM 和列式格式(如ORC、Parquet)等多種技術,為湖存儲帶來了大規模實時更新能力。其中,LSM 數據結構的追加寫能力是 Paimon 實現高性能的關鍵。Paimon 的設計使得它可以在大規模的數據輸入場景中提供出色的性能表現,同時支持快速的查詢和分析操作。其特點如下:
- 高容錯性:LSM 的多版本存儲機制,保障數據可靠性和恢復能力
- 高可擴展性:LSM 的水平擴展能力非常強,可以支持 PB 級別的數據規模
- 高靈活性:Paimon 支持多種列式格式,可以根據不同的業務需求選擇最適合的格式
- 高可定制性:Paimon 提供了豐富的配置選項,可以根據不同的場景進行優化和定制。
- 數據表局部更新
在數據倉庫的業務場景中,寬表數據模型是非常常見的。它是指將業務主體相關的指標、維表和屬性關聯在一起的模型表,也可以泛指將多個事實表和多個維度表相關聯到一起形成的寬表。這種模型能夠幫助我們更好地理解業務數據,提高數據分析的效率。Paimon開發了一個 Partial-Update 合并引擎。它可以根據相同的主鍵實時合并多條流,形成 Paimon 的一張大寬表。而且,借助 它里面的LSM 樹的延遲 Compaction 機制,我們可以用較低的成本完成合并,從而提高了數據處理的效率。舉例來說:當收到主鍵為1的以下三條數據后,它最終會合并成合并的一條數據。
## 輸入
?<1, 23.0, 10, NULL>
?<1, NULL, NULL, 'This is a book'>
?<1, 25.2, NULL, NULL>
## 輸出
<1, 25.2, 10, 'This is a book'>
同時,合并后的表可以提供批讀和流讀:
- 批讀:在批讀時,讀時合并仍然可以完成 Projection Pushdown,提供高性能的查詢。
- 流讀:下游可以看到完整的、合并后的數據,而不是部分列。
- 流批一體讀寫
作為一個流批一體的數據湖存儲,Paimon提供了流寫流讀和批寫批讀的功能。我們可以利用這些特性來構建Streaming Pipeline,并將數據沉淀到Paimon存儲中。在使用 Paimon進行數據處理的過程中,它不僅可以實時更新Flink Streaming作業的數據,還能夠支持OLAP查詢各個Paimon表的歷史和實時數據。此外,還可以通過Batch SQL對之前的分區進行回填,實現批讀批寫的功能,從而更加高效地進行數據處理。
Part 03
Apache Paimon的數據結構
在Paimon中一張表的所有數據文件都存在一個層級的目錄中。其中第一層包含3個文件夾,分別是snapshot、manifest、schema和data。snapshot文件夾主要用于存儲這個表的快照,內容包括為上一次提交產生的 manifest,加上本次提交產生的 manifest 作為增量。schema文件夾主要用于存儲這個表的元信息。manifest文件夾主要用于存儲這個一系列manifest文件,manifest記錄了每次經 checkpoint 觸發而提交的數據文件變更,包含新增和刪除的數據文件。Data文件夾按桶進行劃分。每個桶文件夾包含一個LSM樹和changelog文件。
圖2 Apache Paimon文件層級圖
其文件的更新機制如下:在Apache Paimon中,會在Sink端維護一個Memory Table,用作數據合并,數據會寫入到File Store和 Log Store當中,File Store中保存的就是經過桶分區的LSM樹存儲結構,Log Store則是保存了LSM中的 Write Ahead Log 信息。對于批讀,只需要去讀取File Store;而對于流讀,則需要混合的讀取,先讀取File Store 中的全量數據,再通過Log Store讀取變更的數據。
圖3 Apache Paimon讀寫機制
Part 04
Apache Paimon在威脅情報中的應用
中國移動智慧家庭運營中心威脅情報云平臺是基于中國移動網絡和數據資源優勢構建的。通過應用威脅情報挖掘技術和運營,該平臺為安全產品和安全分析人員提供豐富的惡意IP/域名/樣本IOC、whois、PDNS等情報查詢服務,幫助企業以較低的成本享受專業的威脅情報服務,更好地了解和應對網絡威脅,加強企業的安全防護能力。
在該平臺上,業務分析人員需要仔細分析表中的數據完成每周的報表統計。然而,原始業務數據存儲在mongoDB上,因此在大數據量的情況直接對mongoDB進行分析操作必定會對業務產生影響。為了減少對業務的影響,業務需要將mongoDB表導入到大數據平臺進行分析。考慮到情報數據的規模達到億級別,并且每天都會發生情報老化以及頻繁更新的情況,如果每天都定時進行全量更新,那將會耗費大量資源且效率低下。
因此,為了解決這個問題,我們采用了Flink CDC技術和Apache Paimon數據湖。通過Flink CDC采集mongoDB的oplog,我們能夠實現數據的增量更新到Apache Paimon中,從而提高了同步效率并降低了資源消耗。這種方法使得數據更新更加高效且無需大量的資源投入。
下面是一個Flink SQL通過CDC同步mongoDB情報表到paimon的例子。首先創建一張dim_mongo_threaten_score的mongodb-cdc表,接著創建一張dim_fts_threaten_score的paimon表,最后把dim_mongo_threaten_score導入到dim_fts_threaten_score。
CREATE TABLE IF NOT EXISTS dim_mongo_threaten_score (
_id STRING,
threaten_type STRING,
credit_level STRING,
threaten_score STRING,
threaten_source STRING,
created_time TIMESTAMP(3),
updated_time TIMESTAMP(3),
attack_time STRING,
source STRING,
ip STRING,
domain STRING,
device STRING,
iphone STRING,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '......',
'username' = '......',
'password' = '.....',
'database' = '......',
'collection' = '......',
'heartbeat.interval.ms' = '......',
'poll.max.batch.size' = '......'
);
CREATE CATALOG fts_catalog WITH (
'type'='table-store',
'warehouse'='hdfs:///......'
);
USE CATALOG fts_catalog;
CREATE TABLE IF NOT EXISTS dim_fts_threaten_score (
_id STRING,
threaten_type STRING,
credit_level STRING,
threaten_score STRING,
threaten_source STRING,
created_time TIMESTAMP(3),
updated_time TIMESTAMP(3),
attack_time STRING,
source STRING,
ip STRING,
domain STRING,
device STRING,
iphone STRING,
PRIMARY KEY(_id) NOT ENFORCED
) with (
'bucket' = '......',
'snapshot.time-retained' = '......'
);
insert into fts_catalog.`default`.dim_fts_threaten_score
select * from default_catalog.default_database.dim_mongo_threaten_score;
從上面的腳本可以看出,我們采用了非常簡潔高效的方法來實現從mongoDB到Apache Paimon的數據增量同步。只需要建立兩張表并添加一個簡單的insert語句,就可以完成整個同步過程。最后,從同步效率上來看,從原先的天級延遲到現在的秒級延遲,其提升顯著;從資源消耗上來看,CPU從原先的8核減少到現有的4核,其提升也非常明顯。
Part 05
總結展望
Apache Paimon做為新一代數據湖,其支持高速數據攝取、變更數據跟蹤和實時分析,并為讀/寫操作提供靈活的架構,并與各種計算引擎集成。由于其強大的性能,目前已完成了CDC同步業務數據庫數據到數據湖的場景,實現了自動化數據集成。在未來,也可以基于PartialUpdate機制實現準實時寬表,解決大寬表延遲高和資源浪費的情況。同時,也可以基于AppendOnly機制來替換消息隊列,達到解耦和降本增效的效果。