走近Kafka:大數據領域的不敗王者
一、引言
1.背景
和 RabbitMQ 類似,Kafka(全稱 Apache Kafka)是一個分布式發布-訂閱消息系統。
自 Apache 2010 年開源這個頂級實用項目以來,至今已有十數年,Kafka 仍然是非常熱門的一個消息中間件,在互聯網應用里占據著舉足輕重的地位。
甚至,技術圈一度將 Kafka 評為消息隊列大數據領域中的最強王者!
Kafka 以其速度快(ms 級的順序寫入和零拷貝)、性能高(TB級的高吞吐量)、高可靠(有熱擴展,副本容錯機制能力)和高可用(依賴Zookeeper作分布式協調)等特點聞名于世,它非常適合消息、日志和大數據業務的存儲和通信。
本文接下來將會從下載安裝,配置修改,收發消息等理論和實踐入手,帶大家一起探索 kafka 的核心組件,以及業務中常見的數據消費問題。
二、kafka下載與安裝
1.前提條件
由于 kafka 需要 JDK 環境來收發消息,并通過 ZooKeeper 協調服務,將 Producer,Consumer,Broker 等結合在一起,建立起生產者和消費者的訂閱關系,實現負載均衡。
所以安裝 kafka 之前,我們需要先:
- 安裝 JDK
- 安裝 Zookeeper
網上安裝教程很多,而本文主要探討 kafka,所以就不再這里給出 JDK 和 zk 的詳細安裝步驟了。
2.下載安裝
安裝 Kafka 時,主要有以下兩種方式(更推薦使用 docker 安裝):
- 虛機安裝官網下載 kafka 壓縮包 [https://kafka.apache.org/downloads],或者使用 docker 下載解壓縮至如下路徑 /opt/usr/kafka 目錄下。
- docker安裝(需先在虛機上安裝 docker):
# 拉取鏡像,默認最新版本
docker pull bitnami/kafka
# 創建網絡環境,保證zk和kafka在同一個網絡中
docker network create kafka-network
# 運行zookeper
docker run -d --name zookeeper --network kafka-network bitnami/zookeeper:latest
#運行kafka,其中:環境變量KAFKA_CFG_ZOOKEEPER_CONNECT指定ZooKeeper的連接信息,KAFKA_CFG_ADVERTISED_LISTENERS是Kafka對外的訪問地址
docker run -d --name kafka --network kafka-network \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-p 9092:9092 \
bitnami/kafka:latest
3.修改配置文件
進入目錄 /opt/usr/kafka/config,如果是 docker 安裝方式,需先用命令 docker exec -it containerID bash 進入容器,修改 server.properties 文件:
#broker.id屬性在kafka集群中必須要是唯?
broker.id=0
#kafka部署的機器ip和提供服務的端?號,根據自己服務器的網段修改IP
listeners=PLAINTEXT://192.168.65.60:9092
#kafka的消息存儲?件
log.dir=/opt/usr/data
#kafka連接zookeeper的地址,根據自己服務器的網段修改IP
zookeeper.connect=192.168.65.60:2181
三、啟動Kafka
1.啟動 kafka 服務器
進入 /opt/kafka/bin 目錄下,使用命令啟動:
./kafka-server-start.sh -daemon ../config/server.properties
使用 ps -ef |grep server.properties 命令查看是否啟動成功
2.啟動 Zookeeper
查看 zookeeper 是否正常添加好節點,首先,進入 zookeeper 的某一個容器內【這里進的是 zookeeper:zoo1 節點】
進入 bin 目錄下,使用 zkCli.sh 命令,啟動客戶端
3.判斷是否正常啟動
使用 ls /brokers/ids 命令查詢對應的 kafka broker:
如果看到有對應的 broker.id,如上圖的 1,2,3,就說明已經啟動成功了!
如果有啟動報錯,一般是 server.properties 配置文件有誤:比如,broker Id 不唯一,IP 端口不正確導致。
四、Kafka常見概念與核心組件
以下是 Kafka 中的一些核心組件:
名稱 | 解釋 |
Broker | Kafka 集群中的消息處理節點,?個 Kafka 節點就是?個 broker,broker.id 不能重復 |
Producer | 消息生產者,向 broker 發送消息的客戶端 |
Consumer | 消費者,從 broker 讀取消息的客戶端 |
Topic | 主題,Kafka 根據 topic 對消息進?歸類 |
Partition | 分區,將一個 topic 的消息存放到不同分區 |
Replication | 副本,分區的多個備份,備份分別存放在集群不同的 broker 中 |
1.主題Topic
(1) 什么是Topic
Topic 在 kafka 中是一個邏輯概念,kafka 通過 topic 將消息進行分類,消費者需通過 topic 來進行消費消息。
注意:發送到 Kafka 集群的每條消息都需要指定?個 topic,否則無法進行消費。
(2) 如何創建Topic
我們可以通過以下命令創建一個名為 hello-world 的 topic,在創建 topic 時可以指定分區數量和副本數量。
# 創建 topic
./kafka-topics.sh --create --zookeeper 172.16.30.34:2181 --replication-factor 1 --partitions 1 --topic hello-world
# 通過命令查看 zk 節點下所有的主題
./kafka-topics.sh --list --zookeeper 172.16.30.34:2181
以下是在 docker 容器里創建 topic 的例子:
(3) 查看 topic 的具體信息
我們可以通過以下命令來查看名為 my-replicated-topic 這個主題的詳細信息:
./kafka-topics.sh --describe --zookeeper 172.16.30.34:2181 --topic my-replicated-topic
可以看出該 topic 的名稱,分區數量,副本數量,以及配置信息等:
并且,我們也可以直接在 zookeeper 客戶端查看已創建的主題,通過以下命令查看:
# 進入客戶端
./bin/zkCli.sh
# 查看主題
ls /brokers/topics
get /brokers/topics/hello-world
可以看到,hello-world 主題已經被創建成功了:
2.Partition 分區
由于單機的 CPU、內存和磁盤等瓶頸,因此引入分區概念,類似于分布式系統的橫向擴展。
通過分區,一個 topic 的消息可以放在不同的分區上,好處是:
- 分離存儲:解決一個分區上日志存儲文件過大的問題;
- 提高性能:讀和寫可以同時在多個分區上進行,方便擴展和提升并發。
創建多分區的主題
以下命令創建一個名稱為 hello-world 的 topic,指定 zookeeper 內網節點地址為:172.16.30.34:2181(注意:如果在自己的內網機器上部署,這個地址需要改成自己的服務器 IP)。
--partitions 3:指定分區數量為 3
# 創建topic,replication-factor副本數為3,partitions分區數為1
./kafka-topics.sh --create --zookeeper 172.16.30.34:2181 --replication-factor 1 --partitions 3 --topic hello-world
3.Replication 副本
副本,就是主題中分區創建的多個備份,多個備份在 kafka 集群的多個 broker 中,會有一個 leader,多個 follower。
副本類似于冗余的意思,是保障系統高可用的有效應對方案。
指定副本數量
當新建主題時,除了可指定分區數,還可以指定副本數。
--replication-factor 3:指定副本數量為 3
# 創建topic,replication-factor副本數為3,partitions分區數為1
./kafka-topics.sh --create --zookeeper 172.16.30.34:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
五、在Kafka中收發消息
1.發送消息
當創建完 topic 之后,我們可以通過 kafka 安裝后自帶的客戶端工具 kafka-console-producer.sh,向已創建的主題中發消息:
# 打開hello-world主題的消息發送窗口
./kafka-console-producer.sh --bootstrap-server 172.16.30.34:49092 --topic hello-world
消息發送窗口打開后,向 hello-world 主題中發送消息:
2.消費消息
當消息發送成功后,我們新開一個窗口,通過 kafka 安裝后自帶的客戶端工具 kafka-console-consumer.sh 創建一個消費者,并監聽 hello-world 這個 topic,以消費消息:
# 打開hello-world主題的消息消費窗口
./kafka-console-consumer.sh --bootstrap-server 172.16.30.34:49092 --topic hello-world
在 kafka 中,消費者默認從當前主題的最后一條消息的 offset(偏移量位置)+1 位置開始監聽,所以當消費者開始監聽時,只能收到 topic 之后發送的消息:
從頭開始消費
這時,如果 topic 消息已經發送有一會了,但我們想要從頭開始消費該怎么辦呢?
只需要在開啟消費者監聽時,加一個 --from-beginning 命令即可:
# 從當前主題的第一條消息開始消費
./kafka-console-consumer.sh --bootstrap-server 172.16.30.34:49092 --from-beginning --topic hello-world
從第一條消息開始消費:
六、消息收發相關
1.消息的存儲和順序性
生產者將消息發給 broker,broker 會將消息保存在本地的日志文件中。
在 config 文件中,日志目錄為 /opt/usr/data,文件名為 主題-分區/00000000.log。
在存儲和消費消息時,kafka 會用 offset 來記錄當前消息的順序:
- 消息存儲有序:通過 offset 偏移量來描述消息的有序性;
- 消費有序:消費者消費消息時也是通過 offset 來描述當前要消費的消息位置。
2. 消費組
(1) 創建消費組
當創建消費者時,我們可以為消費者指定一個組別(group)。
--consuemr-property group.id=testGroup:指定 group 名稱為 testGroup
./kafka-console-consumer.sh --bootstrap-server 172.16.30.34:49092 --consuemr-property group.id=testGroup --topic hello-world
指定組別后,在消費消息時,同一個消費組 group 只有一個消費者可以收到訂閱的 topic 消息。
(2) 查看消費組信息
我們可以通過 describe 命令查看消費組信息,命令如下:
# 消費組testGroup的詳細信息
./kafka-consumer-groups.sh --bootstrap-server 172.16.30.34:49094 --describe --group testGroup
消費者信息如下:
我們需要關注的重點字段如下:
- CURRENT-OFFSET:最后被消費的消息偏移量(offset);
- LOG-END-OFFSET:消息總量(最后一條消息的偏移量);
- LAG:積壓了多少條消息。
在同一個消費組里面,任何一個消費者拿到了消息,都會改變上述的字段值。
3.單播/多播消息
當創建消費組后,我們根據消費組的個數來判斷消息是單播還是多播。這倆名詞源于網絡中的請求轉發,單播就是一對一發送消息,多播就是多個消費組同時消費消息。
# 注意,當兩個消費者都不指定消費組時,可以同時消費
./kafka-console-consumer.sh --bootstrap-server 172.16.30.34:49092 --topic hello-world
每次創建消費者時,如果沒有指定消費組,則相當于創建了一個默認消費組,kafka 會為這些默認消費組生成一個隨機的 group id。
所以多次創建默認消費組時,就是多播。
./kafka-console-consumer.sh --bootstrap-server 172.16.30.34:49092 --consuemr-property group.id=testGroup --topic hello-world
而單播消費時,只有一個消費組,所以 group_id 相同。
多播消費時,分別指定不同的消費組名稱或者不指定消費組名稱即可。
4.kafka消息日志文件
在 kafka 中,為了持久化數據,服務器創建了多個主題分區文件來保存消息,其中:
(1) 主題-分區/00000000.log 日志文件里保存了某個主題下的消息;
(2) Kafka 內部創建了 50 個分區 consumer-offsets-0 ~ 49,用來存放消費者消費某個 topic 的偏移量,這些偏移量由消費者消費 topic 的時候主動上報給 kafka。
- 提交到哪個分區由 hash 后取模得出:hash(consumerGroupId)% 50;
- 提交的內容為:key = consumerGroupId + topic + 分區號,value 為當前 offset 的值,為正整數。
在 Kafka 中,消費者的偏移量(consumer offset)是指消費者在分區中已經讀取到的位置。消費者偏移量是由 Kafka 自動管理的,以確保消費者可以在故障恢復后繼續從上次中斷的位置開始消費。
如果大家在日常業務時想要跳過某些不消費的消息,或者重復消費,可以使用 Kafka 提供的 kafka-consumer-groups.sh 腳本,來查看和修改消費者組的偏移量。
七、尾聲
1.小結
本文介紹了 Kafka 以其高速、高性能、高可靠性和高可用性在大數據領域中占據重要地位。
并且從下載安裝 Kafka 開始,到修改配置、服務啟動,通過命令行驗證其是否啟動成功。
接著,我們詳細介紹了 Kafka 的核心組件,包括 Broker、Producer、Consumer、Topic、Partition 和Replication。
然后特別強調了 Topic 的創建和管理,展示了如何創建 Topic、指定分區和副本數量,以及如何查看 Topic 的詳細信息。我們還講述了 Partition 分區的優勢,如分離存儲和提高性能,并解釋了 Replication 副本的概念和重要性。
接著,我們展示了在 Kafka 中發送和消費消息的過程,然后討論了消息存儲、順序性、消費組的創建和查看消費組信息,以及單播和多播消息的概念。
最后,文章提到了 Kafka 中消息日志文件保存的內容,包括消息本身和消息偏移量,以及如何修改消息偏移量的位置。
相信看了這部分內容,大家已經學會如何搭建自己的 kafka 消息隊列了~
2.后續
Kafka 系列文章分為上下篇,上篇主要是核心組件的介紹和實踐上手等內容,包含對 Kafka 做了一個全面介紹,包括安裝、配置、核心組件和消息收發機制,本文是上篇內容。
下篇內容主要討論集群高可用、消息重復消費、延時隊列等常見的高級用法,敬請期待。