構建下一代萬億級云原生消息架構:Apache Pulsar 在 vivo 的探索與實踐
作者 | vivo互聯網大數據團隊 - Chen Jianbo、Quan Limin
本文整理自 vivo 互聯網大數據團隊在 Apache Pulsar Meetup 上的演講《Apache Pulsar 在 vivo 的探索與實踐》,介紹 vivo 在集群管理與監控上應用 Pulsar 的實踐。
vivo 移動互聯網為全球 4 億 + 智能手機用戶提供互聯網產品與服務。其中,vivo 分布式消息中間件團隊主要為 vivo 所有內外銷實時計算業務提供高吞吐、低延時的數據接入、消息隊列等服務,覆蓋應用商店、短視頻、廣告等業務。業務集群已達每天十萬億級的數據規模。
圖 1. vivo 分布式消息中間件系統架構
上圖為系統的整體架構,其中數據接入層包括數據接入、數據采集服務,支持 SDK 直連;消息中間件由 Kafka 和 Pulsar 共同承擔,其中 Pulsar 的承載量達到千億級別;數據處理部分使用 Flink、Spark 等組件。
目前,Kafka 采用多集群方式,根據不同的業務量級、重要性分別使用不同的集群提供服務,比如計費集群、搜索集群、日志集群。在 Kafka 集群的內部,則采用物理隔離的方式,根據不同業務的重要性,將不同業務的 Topic 控制在不同的資源組內,避免業務之間相互影響。
圖 2. Kafka 集群資源隔離
圖 3. Kafka 集群流量均衡
資源組內部則會針對 Topic 流量、分區分布、磁盤容量、機器機架等指標生成遷移計劃進行流量均衡,以此增強 Kafka 可靠性。目前 Kafka 已在多集群部署、資源隔離、流量均衡三個方面保障了基本的穩定性和資源利用率,但是在此之外,系統仍存在一些問題。
一、應對業務流量數十倍增長,引入 Apache Pulsar
過去幾年來,Kafka 集群承載的業務量迅速增長,流量上漲數十倍,帶來諸多問題:
- Topic 及 Topic 分區總量不斷增加,集群性能受到影響:Kafka 高性能依賴于磁盤的順序讀寫,磁盤上大量分區導致隨機讀寫加重;
- 業務流量增加迅速,存量集群變大,需要將老的業務進行資源組隔離遷移或者集群拆分。無論是資源組隔離還是集群的隔離的方式,由于集群不可以進行動態擴縮容,機器不能夠進行靈活調配,都存在利用率不高、運維成本增加的問題;
- 機器擴容慢,需要做長時間流量均衡,難以應對突發流量。集群規模越大,問題越突出;
- 消費端性能擴展太依賴分區擴容,導致集群元數據瘋狂增長;
- 集群數量對應的機器基數大,硬件故障概率高,出現硬件故障時影響會直接傳導到客戶端,缺少中間層容錯。
面對龐大的集群、流量和多樣化的業務場景,綜合考慮集群的穩定性和維護成本等因素,vivo 需要一個功能更豐富、適用更多場景、擴展能力更強的消息組件。
Pulsar 如何解決 vivo 存在的問題,可以首先看一下 Pulsar 的架構設計。Pulsar 采用計算存儲層分離架構。計算層的 Broker 節點是對等且無狀態的,可以快速擴展;存儲層使用 BookKeeper 作為節點,同樣節點對等。這種分離架構支持計算和存儲層各自獨立擴展。
圖 4. Pulsar 存儲計算分離
其次,Pulsar 的各個節點都是輕量化的,在出現故障和宕機時可以快速恢復。一般情況下可以通過快速上下線來解決某個節點機器的問題。同時 Broker 層可以作為 BookKeeper 層的容錯層,可以防止故障直接傳導至用戶端。
Pulsar 擴容時無需長時間的數據遷移,且支持實時均衡。Broker 層抽象了 Bundle 概念,可以用有限的 Bundle 映射海量 Topic,Topic 可以隨著 Bundle 遷移,通過動態遷移 Bundle 可以更好地應對流量突發場景。BookKeeper 分層分片的架構讓數據分布均勻,在 Broker 層有一個選擇機制,在擴容時可以將數據寫入存儲量小的節點,擴容時無需數據遷移,提供更好的流量高峰應對能力。Bookie 進行數據刷盤時會對批量數據自動進行數據排序,可以避免 Kafka 中的隨機讀寫。
Pulsar 提供了四種消息模型:Exclusive、Failover、Shared 和 Key_Shared,其中 Shared 模型允許一個分區同時被多個消費實例訂閱消費,并采用 Round Robin(輪詢)方式將數據推送到各個消費實例。因此消費能力的擴展不會過于依賴分區擴容,慢消費的消費實例也可以在 Shared 模型中得到解決。Key_Shared 模型則是在 Shared 的基礎上對應對順序性有要求的場景,可以按照 Key 來消費。
圖 5. Pulsar 訂閱模型
Pulsar 的設計架構帶來了海量分區支撐、消費擴展、精準限流、流量均衡、快速擴縮容、故障恢復、分層存儲、云原生容器部署、異地多活等特性和優勢,可以幫助集群更好地實現高可用、高擴展,提高了更高的彈性。
二、Apache Pulsar 集群管理實踐
下面我們從流量控制和數據管理方面,分享 vivo 在使用 Pulsar 過程中的集群管理經驗。
2.1 Bundle 的管理
在集群流量控制層面,比較關鍵的一點就是 Bundle 的管理。Bundle 負責控制用戶流量到 Broker 的具體分布。Broker 與 Topic 之間沒有直接聯系,而是在 Broker 之上抽象出 Bundle 概念,通過 Bundle 與 Topic 建立關系;Topic 通過名稱計算哈希值,并散列分布到一致性哈希環中,而哈希環的每一段都是一個 Bundle。另外 Load Manager 根據 Bundle 的負載情況將后者分配到對應的 Broker 上,將 Bundle 數據存儲在 ZooKeeper 中。由此以來就間接實現了 Topic 與 Broker 之間的聯系(可參考近期 StreamNative 發布的 Broker 負載均衡技術文章)。?
圖 6. Bundle 與 Topic 建立關系
這里需要注意:
- Bundle 的個數影響均衡效果,因為通過一致性哈希來確認 Topic 應該落在哪個 Bundle 上, Topic 與 Bundle 會存在不均衡分配,某些 Bundle 分配的 Topic 可能較多或較少。Bundle 越多,每個 Bundle 承載的 Topic 越少,粒度越細。依賴于 Pulsar 的負載均衡算法,均衡效果更好;否則若 Bundle 太大,無論如何卸載都很難平衡負載;
- Bundle 數據和 Broker 映射元數據都維護在 ZooKeeper 中,需要做好 Bundle 數量的規劃。
針對以上兩點,我們根據 Broker 來設置 Bundle 數量設置最小最大值來控制,還可以對流量較大的 Topic 針對性擴大分區,讓分區均勻分配到 Broker Bundle 上。
Pulsar 雖然提供了海量分區能力,但是過多的 Topic 或者分區產生的 lookup 也會對集群產生較大的壓力。集群管理者需要提前規劃 Bundle 和分區設置,杜絕濫用。
另外對 Bundle 的操作需要注意:
- Pulsar 本身提供了卸載操作,可以解除 Bundle 和 Broker 的關聯關系,將 Bundle 重新分配。線上流量較大時應卸載 Bundle 而不是整個命名空間,因為卸載后者會導致其上的全部 Bundle 與對應的生產者、消費者斷開,重新進行 lookup。
- 利用 Bundle split 對流量較大的 Bundle 進行拆分,增加命名空間的 Bundle 數量,降低影響。
總體而言,用戶需要注意流量的均衡與集群的穩定性,在集群管理之初就做好 Bundle 的數量管理和相關測試,謹慎對待大批量 Bundle 卸載等運維操作。
2.2 數據的管理
接下來我們從數據的存儲、過期、刪除三個方面來分析。
(1) Ledger 翻轉
首先介紹數據寫入 ledger 的過程。每一個 Topic 分區在一段時間內只創建一個 Ledger 維護分區寫入的 Entry 的數據歸屬。Topic 分區寫入的數據以 Entry 的形式,經過 Broker 寫入 Netty 線程處理隊列,線程依次根據 Entry 的 Ledger Id,對 Ledger 目錄數取模,寫入到目標磁盤 Ledger 目錄,最終以 Entry Log 和 RocksDB 的索引方式存儲。需要注意,Ledger 是一個分區在一段時間內寫入數據的邏輯管理單位,維護了這段數據存儲的 Bookie 位置。一個 Topic 分區在一段時間內寫入的數據只被一個活躍 Ledger 管理,待該 Ledger 達到翻轉條件后才會關閉 Ledger 并重新計算,創建新 Ledger 繼續寫入。?
圖 7. Ledger 翻轉示意
Ledger 翻轉后,數據才會寫入新的數據目錄。在 Pulsar 中,在滿足 Ledger 最小翻轉時間以及以下條件之一后觸發 Ledger 翻轉:
- 已達到 Ledger 最大翻轉時間;
- 已達到 Ledger 的最大 Entry 數量;
- 已達到 Ledger 的最大大小。
默認值:
觸發ledger翻轉的最小時間:
managedLedgerMinLedgerRolloverTimeMinutes=10
觸發ledger翻轉的最長時間:
managedLedgerMaxLedgerRolloverTimeMinutes=240
觸發ledger翻轉的最大entry數:
managedLedgerMaxEntriesPerLedger=50000
觸發ledger翻轉的最大大小:
managedLedgerMaxSizePerLedgerMbytes=2048
注意兩個問題:
- Ledger 過大:最小翻轉時間是防止 Ledger 元數據過快增長的手段,但實踐發現如果 Topic 分區流量較大,Ledger 的實際值可能遠超上述設置的上限閾值。Ledger 只有在翻轉后才會創建新的 Ledger,Ledger 過大會導致某段時間內寫入某個磁盤的數據過多,產生磁盤存儲不均衡的問題;針對 Ledger 為對象的一些操作也會受到影響,產生無法及時卸載數據到二級存儲、數據卸載時間較長、還未卸載成功但 Ledger 已經過期等問題。
- Ledger 間不均衡:Ledger ID 以集群維度進行遞增。在分區的維度,按照 Ledger ID 對 Ledger 存儲目錄數進行取模的方式無法對多磁盤進行均衡寫入。但保持 Ledger 間的大小一致,在一定程度上會對多磁盤目錄的寫入均衡有比較大的改善。
總而言之,建議根據業務消息情況適當調整 Ledger 翻轉參數和有針對性地增加大流量 Topic 分區數量,可以防止 Ledger 過大、大小不均衡的問題。
(2)數據過期
數據過期主要分為四個階段:
第一階段:未被 Ack 的消息
Backlog 消息:該段數據不會被刪除
第二階段:已經 Ack 的消息
- 訂閱主動 Ack 后,標記為非 backlog 消息,有多個訂閱時以最慢的為準
- TTL:若某 Topic 沒有活躍訂閱,超過 TTL 存活時間的消息會被主動 Ack ,本質上是移動 cursor
第三階段:消息保留時間檢查
Retention:對已經 Ack 的消息的保留策略,按保留周期和保留大小設置來保留消息。
第四階段:消息刪除
Deleted:超過 Retenion 范圍的消息則被刪除。超過 rentention 保留周期和保留大小的消息,系統會從當前已經 ack 消息的最新位置往前檢查并獲取已經過期的 ledger,將其標記刪除。
圖 8. 消息保留時間檢查與消息刪除
從上述的消息階段演化來看,Pulsar 提供了較大的消息管理空間,但也略顯復雜。建議集群維護者建立簡單統一的規則處理數據保留策略,如可以設置 TTL = Retention 保留周期值。
(3) 數據刪除
此處介紹數據的物理刪除。Bookie 在處理數據寫入過程時,會將同一段時間內的數據經過排序 flush 到同一個 Entry Log 文件中,將索引存放在 RocksDB 中。由于多個 Ledger 的數據可能會同時寫入同一個 Entry Log,因此 Entry Log 便不能被簡單直接的刪除。對此 BookKeeper 會啟動一個 GC(GarbageCollector) 線程進行檢查和物理刪除操作。?
圖 9. 數據物理刪除流程
Entry Log 維護元數據信息( EntryLogMetadata),該元數據記錄了 Ledger 列表、大小與剩余有效數據比例。
GC 清理線程在每個 gcWaitTime 時間間隔:
- 掃描 Entry Log 的元數據信息,對于已經沒有有效數據的 entry log 直接進行刪除。
- 判斷是否滿足 compaction 條件,滿足 compaction 條件后 GC 線程會讀取每一個 Entry 判斷其是否過期,一旦過期就會丟棄,否則會將數據寫入新的 Entry Log。
Compaction 分為 minorCompaction 和 majorCompaction,二者區別在于閾值。默認情況下,minorCompaction 清理間隔 1 小時,閾值 0.2;majorCompaction 清理間隔 24 小時,閾值 0.8。閾值是 Entry Log File 中的剩余有效數據占比。?
minorCompactionInterval=3600
minorCompactionThreshold=0.2
majorCompactionThreshold=0.8
majorCompactionInterval=86400
在實際使用中,如果機器節點的磁盤較小且數據遲遲得不到刪除,為了及時清除數據,應該按照業務流量和磁盤空間適當調整數據清理間隔時間、有效數據閾值,并配合 compaction 限速策略減小對集群的影響。
三、Pulsar 監控實踐
vivo 的 Pulsar 指標監控鏈路架構如下:?
圖 10. vivo 針對 Pulsar 監控指標搭建的監控架構
該架構中:
采用 Prometheus 采集 Pulsar 指標;
- 應用 Prometheus 遠程存儲特性將格式化后的指標發送到 Kafka;
- Druid 消費 Kafka 數據后可以作為 Grafana 的數據源,配置 Grafana 面板查詢指標。
為什么不使用 Prometheus 存儲數據?因為有些數據較久遠,一旦集群規模增加,監控指標數量級會很大。Prometheus 對資源依賴重,我們只采用了它的采集能力。
下圖是常用的關鍵指標:
圖 11. 關鍵監控指標
指標類型分為:
- 【客戶端指標】:用來排查客戶端出現的異常
- 【Broker 端指標】:監控 topic 流量、調整 broker 間流量差距
- 【Bookie 端指標】:排查讀寫延遲等問題
除了官方指標外,團隊還開發了 Bundle 相關的一些指標:
- 分區數、流量等在 Bundle 的分布
- Broker 端記錄讀寫延遲的 P95/P99 值
- 基于請求對列實現 Broker 端網絡負載指標等。
四、問題優化與最佳實踐
4.1 負載均衡參數
負載均衡的目的是對資源平均分配,差異大會影響穩定性。對負載均衡設置的目標是節點流量偏差 20% 以內,每天均衡頻次在 10 次以內,否則客戶端會頻繁斷連、重連。優化后的參數如下:?
# load shedding strategy, support OverloadShedder and ThresholdShedder, default is OverloadShedder
loadBalancerLoadSheddingStrategy=org.apache.pulsar.Broker.loadbalance.impl.ThresholdShedder
# enable/disable namespace Bundle auto split
loadBalancerAutoBundleSplitEnabled=false
# enable/disable automatic unloading of split Bundles
loadBalancerAutoUnloadSplitBundlesEnabled=false
#計算新資源使用量時的CPU使用權重(默認1.0)
loadBalancerCPUResourceWeight=0.0
#計算新的資源使用量時的堆內存使用權重(默認1.0)
loadBalancerMemoryResourceWeight=0.0
#計算新資源使用量時的直接內存使用權重(默認1.0)
loadBalancerDirectMemoryResourceWeight=0.0
下面三個參數改為零,是因為集群使用了相同的機型,團隊更關注流量均衡,對內存和 CPU 不是特別關注。
以一個具體產品案例來看,其中有 1 個 Topic、30 個分區、180 個 Bundle:
圖 12. 1 個 Topic、30 個分區、180 個 Bundle 的每秒入流量
上圖節點間流量差異較大,由 Bundle unload 導致。
圖 13. 1 個 Topic、30 個分區、180 個 Bundle 下,Bundle 上 Topic 分區情況
上圖可看出,有兩個 Bundle 分配了四個分區,遠超其他 Bundle。實踐中出現以下問題:
- 均衡頻次高,一天大概有 200 多次
- 客戶端連接頻繁切換,流量波動大
- 每個 Bundle 的分區數量分布差異大?
圖 14. 1 個 Topic、30 個分區、180 個 Bundle 的入流量分布
優化過程中,關鍵在于將分區打散到不同 Bundle 上,但分區數量太少很難做到。Topic 通過哈希算法分配到 Bundle 上在前文已經介紹。此案例中,問題在于分區數量少。
于是團隊將分區增加到 120 個,效果如下:
- 節點間流量差異小
- 均衡頻次降低,一天大概有 10 次左右
- 客戶端連接切換減少,流量波動較小
- 每個 bundle 的分區數量分布差異降低?
圖 15. 1 個 Topic、120 個分區、180 個 Bundle 的每秒入流量
圖 16. 1 個 Topic、120 個分區、180 個 Bundle 下,Bundle 上 Topic 分區情況
圖 17. 1 個 Topic、120 個分區、180 個 Bundle 的入流量分布
4.2 客戶端發送性能
在和上述業務相同的場景中,分區數量增加后,系統滾動重啟后出現了流量下降情況:
圖 18. 單個 Topic,30 個分區增加到 120 個,系統滾動重啟后流量下降
客戶端配置參數:
- memoryLimitBytes=209715200 (默認為 0)
- maxPendingMessages=2000 (默認 1000)
- maxPendingMessagesAcrossPartitions=40000 (默認 50000)
- batchingMaxPublishDelayMicros=50 (默認 1 毫秒)
- batchingMaxMessages=2000 (默認 1000)
- batchingMaxBytes=5242880 (默認 128KB)
滿足三個 batch 數據中的任何一個的情況下就會觸發打包、發送。
圖 19. 重啟后 maxPendingMessages(隊列長度)出現下降
這里 maxPendingMessages(隊列長度)
=min(maxPendingMessages,maxPendingMessagesAcrossPartitions/partitionNum) 。
而分區數添加(30 -> 120)后,需要重啟客戶端才對隊列長度生效。重啟后 maxPendingMessages 隊列長度 從 40000/30 = 1333 變為 40000/120 = 333,出現了明顯下降。
另外,測試發現 batchingMaxMessages 調小后性能提升 10 倍之多:
圖 20. 單個 Topic,30 個分區增加到 120 個,調整后性能提升
建議 batchingMaxPublishDelayMicros 不要過大,確保 batchingMaxMessages 比 maxPendingMessages 要大,否則等待 batchingMaxPublishDelayMicros 才會發送。
4.3 宕機導致集群流量驟降
某個分區隊列滿后會導致發送線程阻塞,影響所有分區的整體發送和集群穩定性:?
圖 21. 執行 Kill-9 一臺 Broker 后,其他 Broker 流量下降
圖 22. 第四個分區已滿,發送線程阻塞在 canEnqueRequest 上,等待時間長,其他未滿分區的發送也被影響。
圖 23. 極端情況下,第四個分區已滿,其他分區等待中。發送線程會在第四個分區阻塞等待,其他線程無法發送。
針對這一問題的優化思路,首先是能者多勞,讓發送快的分區盡可能多發送;然后是將阻塞點從 ProducerImpl 移到 PartitionedProducerImpl;如果分區 ProducerImpl 出現隊列已滿阻塞較長時間,就將該分區排除。
圖 24. 宕機導致集群流量驟降優化思路
實踐中可分為可用 Producer 和不可用 Producer 兩個列表。在 ① 中,兩個列表都處于初始化狀態并可用;在 ② 中,某個可用分區阻塞一段時間后可以等待一段時間;若不可用就移動到不可用列表中,如 ③ 所示;當分區可用比例達到閾值再挪回可用列表,如 ④ 所示。
經過優化后,宕機 Broker 流量可以快速轉移到其他 Broker:
圖 25. 優化后 Broker 流量分流并上漲
注:優化只支持 RoundRobinPartitionMessageRouterImpl 路由策略。
在單個 ProducerImpl 對應的 Broker 出現處理慢、網絡慢等導致發送響應慢的情況,都可能會導致發送線程阻塞,業務發送消息的速度受限于最慢的 ProducerImpl 的速度。
五、未來展望
本文分享了 vivo 在 Pulsar 集群管理與監控的經驗,并介紹 vivo 在負載均衡等方面的最佳實踐。
由于服務端的問題很難通過監控指標進行分析,vivo 在未來計劃實現生產端到消費端的全鏈路監控能力。大數據團隊希望整合大數據組件,支撐 Flink、Spark、Druid 等核心下游組件打通落地。
同時,vivo 內部目前 Pulsar 與 Kafka 同時運行,團隊將嘗試基于 KoP 對存量 Kafka 萬億流量嘗試遷移,降低 Kafka 遷移成本;并探索容器化落地,充分發揮 Pulsar 云原生優勢。