每天處理千億級日志量,Kafka是如何做到的?
之前為大家分享了不少 Kafka 原理解析類的干貨,今天咱們一起來看看 360 基于 Kafka 千億級數據量的深度實踐!
圖片來自 Pexels
本文主要圍繞如下內容分享:
- 消息隊列選型
- Kafka 在 360 商業化的現狀
- Kafka Client 框架
- 數據高可用
- 負載均衡
- 鑒權、授權與 ACL 方案
- Quota 機制
- 跨 IDC 的數據同步
- 監控告警
- 線上問題及解決方案
消息隊列選型
當時主要考慮以下幾個維度:
- 社區活躍度
- 客戶端支持
- 吞吐量
對比幾個系統下來,覺得 Kafka 比較符合我們的要求。現在有一個新的開源系統 Pulsar,我覺得也可以嘗試一下。
Kafka 設計上的亮點如下:
Kafka 性能和吞吐都很高,通過 Sendfile 和 Pagecache 來實現 Zero Copy 機制,順序讀寫的特性使得用普通磁盤就可以做到很大的吞吐,相對來說性價比比較高。
Kafka 通過 Replica 和 ISR 機制來保證數據的高可用。
Kafka 集群有兩個管理角色:
- Controller 主要是做集群的管理。
- Coordinator 主要做業務級別的管理。
這兩種角色都由 Kafka 里面的某個 Broker 來擔任,這樣 Failover 就很簡單,只需要選一個 Broker 來替代即可。
從這個角度來說 Kafka 有一個去中心化的設計思想在里面, 但 Controller 本身也是一個瓶頸,可以類比于 Hadoop 的 Namenode。
CAP 理論相信大家都有了解過,分布式系統實現要么是 CP,要么是 AP。
Kafka 實現比較靈活,不同業務可以根據自身業務特點來對 Topic 級別做偏 CP 或偏 AP 的配置。
支持業務間獨立重復消費,并且可以做回放。
這個是 Kafka 的簡要架構,主要分為:
- 生產端
- Broker 端
- 消費端
日志有三個層次:
- 第一個層次 Topic
- 第二個層次 Partition(每個 Partition 是一個并行度)
- 第三個層次 Replica(Replica 表示 Partition 的副本數)
Kafka 在 360 商業化的現狀
目前集群有千億級數據量,100 多臺萬兆機器,單 Topic 的最大峰值 60 萬 QPS,集群的峰值大概在 500 萬 QPS。
我們的物理機配置 24Core/10G 網卡/128G 內存/4T*12 HDD,值得說一下的是我們采用了萬兆網卡加普通磁盤 4T*12 的配置,測下來磁盤吞吐和網絡吞吐是能夠匹配上的。
再者考慮到我們的數據量比較大,SSD 盤沒有特別大的且成本比較高。
磁盤的組織結構我們用的是 JBOD,RAID10 也是很好的方案(磁盤成本會翻倍)。
我們目前的 Kafka 版本是 1.1.1,推薦大家部署 0.11 以上的版本會好一些,這個版本對協議做了很多優化,對于后續的 2.x 版本都是兼容的。
這個是我們 Kafka 上下游相關的組件,生產端主要是各種 Kafka Clients/實時服務/Flume/Logstash。
消費端分為實時,離線(ETL),監控三部分。實時有 Spark/Flink/Storm 等主流框架, 離線部分我們基于 Flink 自研了一個統一落地框架 Hamal,從 Kafka 消費一遍數據就可以落地到多個下游系統(HDFS、Hbase、Redis等),可以避免重復消費。
還有部分是監控的需求,我們把 ES/InfluxDB 相關的日志打到 Kafka,然后再消費出來通過 Grafana 展示,但目前我們已經切到 Prometheus 上了。
Kafka Client 框架
為什么要做這個框架呢?之前有很多的業務部門用裸 API 自己去實現 Kafka Client 的邏輯。
但是會有很多問題,有一些異常情況會 Catch 不全,我們做這個框架是想把所有的細節屏蔽掉,然后暴露出足夠簡單的接口。
這樣可以減少業務犯錯的可能性,我們要確保極端的情況下比如網絡或集群異常時的可用性,如果網絡或集群不可用,數據會先落到本地,等恢復的時候再從本地磁盤恢復到 Kafka 中。
我們實現了兩個框架:
- LogProducer,支持 at least once。
- LogConsumer,支持 at least once 和 exactly once 兩種語意,其中 exactly once 需要業務去實現 Rollback 接口。
LogProducer 框架的大體思路是通過內存隊列將日志發送到 Kafka,當 Kafka 或網絡不可用的情況下會寫本地磁盤,同時會有一個線程去實時檢測 Kafka 或者網絡的可用情況,如果恢復就會加載磁盤日志并發送到 Kafka。
我們還支持一種共享內存的策略來代替內存,使用共享內存是為了減少重啟過程中日志的丟失數。
LogConsumer 的框架實現,通過 Blocking Queue 將 Consumer 線程和 Worker 線程解耦,因為現實情況是消費邏輯很簡單,但是處理邏輯會很復雜。
這樣就可以對 Consumer 線程和 Worker 線程做不同的配置,同時通過 Blocking Queue 還可以實現反壓機制。
比如 Worker 處理不過來了,這時候 Blocking Queue 就會滿,反壓到 Consumer 線程會停止消費。
同時我們在 Worker 線程接口里面會提供接口讓用戶提交到 global offsetmap。
如上圖我們提供三個組合接口,如果在業務處理與 Commit 中實現了業務端 Rollback 邏輯, 那么就是 exactly once 語義,默認是 at least once 語義。
數據高可用
之前講過 Kafka 本身提供 Replica+ISR 的機制來保證數據高可用,但我們覺得這個可能還不夠,所以我們還要支持 Rack Aware。
比如 Replica=3 的情況,確保三個副本在不同的物理 Rack 上,這樣我們最多能容忍兩個物理機架同時出問題而數據仍可用,我們 Rack Aware 方案是與負載均衡方案一起做掉的,具體后面會講。
值得注意的是 Kafka 官方也支持 Rack Aware,通過在 Broker 端配置 broker.rack 參數可實現。
但有一個限制,必須為每個 Rack 分配數量相同的 Brokers,否則會導致 Replica 分配傾斜,實際情況是 IDC 的 Rack 是很多的,分配到的物理機分布也可能很隨機。
一個可以參考的解決思路是采用虛擬 Rack Group 的概念,比如維護 3 個虛擬 Rack Group,申請到的物理機加入到這 3 個 Group 中,并確保 Rack Group 間分配的物理機數量一致。
當然 Rack Group 間物理機不應存在有相同物理 Rack 的情況。
負載均衡
Kafka 的負載均衡功能在 Confluent 商業版本才支持,負載均衡本質上來說是 Replica 分配均勻問題。
我們一開始想通過經典一致性 Hash 來解決,如下圖:
然后我們發現經典一次性 Hash 不能滿足我們的需求,比如要加一個節點 node5,只能分擔節點 node2 的部分負載,不能做全局節點的負載均衡。
于是我們基于虛擬節點的一次性 Hash 的算法實現了一個方案,如圖所示:相同的顏色對應同一個物理機,Hash 環上的都是虛擬節點。
這里有四個物理節點,其中 node4 是我們新加的節點。通過虛擬節點可以把物理節點的負載足夠均衡地分散出去,所以當我把 node4 加到 Hash 環上的時候,分擔了所有物理機的負載。
算法實現的步驟分為兩個大的步驟:
①新建 hash circle:通過 vnode_str(比如 hostname-v0)做一個 MD5 的 Hash,得到虛擬節點的 vnode_key,再用 ring 字典來保存虛擬節點到物理節點的映射,同時將 vnode_key 加入到 sorted_keys 的 list 中。
②在 Hash 環中分配 Replica:將(topic_name+partition_num+replica_num)作為 Key 用相同的 MD5 Hash 算法得到 replica_key。
接著二分查找該 replica_key 在 sorted_keys 中的 Position, 最后用 Ring 字典來映射到物理機 Node,至此 Replica 分配完成。
我們基于這個算法解決三個問題:
- 添加物理節點只需遷移很小一部分數據。
- 對不同配置的物理機做權重設置,可以支持異構集群的部署。
- 實現 Replica 的 Rack Aware,物理節點上面會有 Rack 信息,在為 Replica 分配物理節點的時候會記錄已經分配的 Rack 信息。
如果有重復的情況,就會把 vnode_key 找到 Position 的位置 +1 找下一個物理節點,我們會確保三個 Replica 的物理 Rack 一定是不一樣的(假如 Replica=3)。
Leader Balance:這是一種快速且成本低的負載 Balance 方法,因為 Kafka 只有 Leader 提供讀寫,所以通過 Leader 切換是可以達到負載切換的效果的,由于只是 Leader 切換不涉及數據同步,因此這個代價是比較小的。
Disk Rebalance:這個 Feature 需要 Kafka1.1.0 版本之后才支持,Kafka 提供了一些腳本和 API 可以做 Balance 操作, 其本質也是生成 Replica Plan 然后做 Reassign。
鑒權、授權和 ACL 方案
如果是新集群比較推薦基于 SASL 的 SCRAM 方案,實施起來比較簡單。
如果老集群想中途施行鑒權授權機制會比較困難,需要推各個業務去修改配置,同時切換的過程也很容易出問題。
下面介紹下我們實現的一個白名單機制來解決老集群的問題,首先將老業務加入到白名單中,讓新業務通過工單流程來申請 Topics 和 Consumers 兩種資源權限并加到白名單里,定期監測非法(沒有走工單)Topics,Consumers 資源。
同時將這些資源都 Deny 掉,這樣就收緊了 Topics 和 Consumer 讀寫權限的口子,同時原有業務不會有任何影響。
Quota 機制
Quota 主要是為了解決多個業務間資源搶占問題。Quota 類型有兩種:
- 一種是限制網絡帶寬。
- 一種是限制請求速率(限制 CPU)。
我們對業務做了三個優先級設置:高,中,低優先級,高優先級不做限制,中優先級可容忍 lag,低優先級極端情況可停掉,通過工具可以批量限制某個優先級的所有業務,可以確保高優先級業務及集群的安全。
跨 IDC 的數據同步
首先我們為什么要做跨 IDC 的數據同步?沒做這個同步之前業務可能對數據的讀寫沒有一個 IDC 的概念,所以很容易就會有跨 IDC 的讀寫,多個業務還可能有重復 Consume 和 Produce。
這就造成跨 IDC 網絡的極大浪費, 加上跨 IDC 的網絡并不穩定,有時候會有一些異常,業務也不一定能很好處理。
為了解決以上問題,我們統一做了跨 IDC 的數據同步服務,首先我們約定業務只能做本 IDC 的讀寫,不允許做跨 IDC 的讀寫,如果有跨 IDC 的數據需求,要向我們申請,通過 Mirrormaker 去同步一份過來。
這樣做有兩個好處:
- 一是屏蔽了異常對業務的影響。
- 二是節省了 IDC 之間的帶寬(我們通過同步機制能保證這份數據只傳輸一份)。
我們還基于 Marathon/Mesos 對這個服務做了 Pass 化,提高了服務的 SLA。
監控告警
我們的監控警告平臺如下:
- 基于 Jmx exporter+Promehteus+Grafana 來做圖表展示,在每個 Broker 上面部署 Jmx exporter,Prometheus 會去 Pull 這些數據,最后通過 Grafana 來展示。
- 基于 Kafka Manager 做瞬態指標的監控。
- 基于 Burrow 做 Consumer lag 的監控。
- 基于 Wonder 來做告警,這個是 360 內部實現的一個組件,類似 Zabbix。
線上問題及解決方案

磁盤故障:我們通過 Smartctl 來監測,首先狀態是要 Passed 的,其次我們會判斷 197 Current_Pending_Sector 這個屬性值不能大于 100, 如果大于 100 這個磁盤可能有讀寫性能問題。
bootstrap.servers 性能瓶頸:該參數可以配置多臺 Broker,這些 Broker 作為 Proxy 的角色為 Kafka Clients 提供 Lookup 服務。
如果集群規模很大,Clients 很多的情況下,這些 Proxy 角色的 Broker 的負載會很大,為了解決這個問題,我們對 bootstrap.servers 參數做了 VIP 配置。
每個 VIP 可以綁定任意多的 Brokers,這樣在客戶端不需要修改配置的情況下可以對 Proxy 動態擴縮容。
Consumer 重啟不消費:業務反饋消費停止,重啟也不能夠解決問題,后來定位發現是早于 0.11 之前版本的 Bug:
- https://issues.apache.org/jira/browse/KAFKA-5413
原因是 log cleaner 線程掛了導致 Compact 停止,__consumer_offsets 這個 Topic 的量非常大,broker reload 時間特別長,這段時間是停止服務的。
解決方法有兩個:
- 一是升級到 Kafka 0.11+ 版本
- 二是將 Offset 遷移到新的 Consumer Group 來解決(規避掉有問題的 Coordinator)。
嚴鎖鵬
嚴鎖鵬,奇虎 360 大數據架構運維專家,具有 10 年基礎架構與大數據開發經驗。2013 年加入 360 商業化團隊,負責消息中間件開發與運維,同時涉及大數據架構、微服務架構、實時計算平臺、機器學習平臺、監控系統等基礎設施建設,致力于為商業化團隊提供穩定高效的基礎服務。