
大家好,我是 華仔, 又跟大家見面了。
原文完整版在星球里面,如果感興趣可以掃文末二維碼加入。
上篇主要帶大家深度剖析了「號稱承載 Kafka 客戶端消息快遞倉庫 RecordAccmulator 的架構設計」,消息被暫存到累加器中,今天主要聊聊「發送網絡 I/O 的 Sender 線程的架構設計」,深度剖析下消息是如何被發送出去的。
這篇文章干貨很多,希望你可以耐心讀完。
一、總的概述
通過「場景驅動」的方式,來看看消息是如何從客戶端發送出去的。
在上篇中,我們知道了消息被暫存到 Deque<ProducerBatch> 的 batches 中,等「批次已滿」或者「有新批次被創建」后,喚醒 Sender 子線程將消息批量發送給 Kafka Broker 端。

接下來我們就來看看,「Sender 線程的架構實現以及發送處理流程」,為了方便大家理解,所有的源碼只保留骨干。
二、Sender 線程架構設計
在《圖解Kafka生產者初始化核心流程》這篇中我們知道 KafkaProducer 會啟動一個后臺守護進程,其線程名稱:kafka-producer-network-thread + "|" + clientId。
在 KafkaProducer.java 類有常量定義:NETWORK_THREAD_PREFIX,并啟動 守護線程 KafkaThread 即 ioThread,如果不主動關閉 Sender 線程會一直執行下去。
github 源碼地址如下:
?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java?
?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java?
?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java?
?https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java?
public class KafkaProducer<K, V> implements Producer<K, V> {
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
// visible for testing
@SuppressWarnings("unchecked")
KafkaProducer(Map<String, Object> configs,Serializer<K> keySerializer,
Serializer<V> valueSerializer,ProducerMetadata metadata,
KafkaClient kafkaClient,ProducerInterceptors<K, V> interceptors,Time time) {
try {
...
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
...
log.debug("Kafka producer started");
} catch (Throwable t) {
...
}
}
}
從上面得出 Sender 類是一個線程類, 我們來看看 Sender 線程的重要字段和方法,并講解其是如何發送消息和處理消息響應的。
1、關鍵字段
/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
*/
public class Sender implements Runnable {
/* the state of each nodes connection */
private final KafkaClient client; // 為 Sender 線程提供管理網絡連接進行網絡讀寫
/* the record accumulator that batches records */
private final RecordAccumulator accumulator; // 消息倉庫累加器
/* the metadata for the client */
private final ProducerMetadata metadata; // 生產者元數據
/* the flag indicating whether the producer should guarantee the message order on the broker or not. */
private final boolean guaranteeMessageOrder; // 是否保證消息在 broker 端的順序性
/* the maximum request size to attempt to send to the server */
private final int maxRequestSize; //發送消息最大字節數。
/* the number of acknowledgements to request from the server */
private final short acks; // 生產者的消息發送確認機制
/* the number of times to retry a failed request before giving up */
private final int retries; // 發送失敗后的重試次數,默認為0次
/* true while the sender thread is still running */
private volatile boolean running; // Sender 線程是否還在運行中
/* true when the caller wants to ignore all unsent/inflight messages and force close. */
private volatile boolean forceClose; // 是否強制關閉,此時會忽略正在發送中的消息。
/* the max time to wait for the server to respond to the request*/
private final int requestTimeoutMs; // 等待服務端響應的最大時間,默認30s
/* The max time to wait before retrying a request which has failed */
private final long retryBackoffMs; // 失敗重試退避時間
/* current request API versions supported by the known brokers */
private final ApiVersions apiVersions; // 所有 node 支持的 api 版本
/* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
private final TransactionManager transactionManager; // 事務管理,這里忽略 后續會有專門一篇講解事務相關的
// A per-partition queue of batches ordered by creation time for tracking the in-flight batches
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches; // 正在執行發送相關的消息批次集合, key為分區,value是 list<ProducerBatch> 。
從該類屬性字段來看比較多,這里說幾個關鍵字段:
- client:KafkaClient 類型,是一個接口類,Sender 線程主要用它來實現真正的網絡I/O,即 NetworkClient。該字段主要為 Sender 線程提供了網絡連接管理和網絡讀寫操作能力。
- accumulator:RecordAccumulator類型,上一篇的內容 圖解 Kafka 源碼之快遞倉庫 RecordAccumulator 架構設計,Sender 線程用它獲取待發送的 node 節點及批次消息等能力。
- metadata:ProducerMetadata類型,生產者元數據。因為發送消息時要知道分區 Leader 在哪些節點,以及節點的地址、主題分區的情況等。
- guaranteeMessageOrder:是否保證消息在 broker 端的順序性,參數:max.in.flight.requests.per.connection。
- maxRequestSize:單個請求發送消息最大字節數,默認為1M,它限制了生產者在單個請求發送的記錄數,以避免發送大量請求。
- acks:生產者的消息發送確認機制。有3個可選值:0,1,-1/all。
- retries:生產者發送失敗后的重試次數。默認是0次。
- running:Sender線程是否還在運行中。
- forceClose:是否強制關閉,此時會忽略正在發送中的消息。
- requestTimeoutMs:生產者發送請求后等待服務端響應的最大時間。如果超時了且配置了重試次數,會再次發送請求,待重試次數用完后在這個時間范圍內返回響應則認為請求最終失敗,默認 30 秒。
- retryBackoffMs:生產者在發送請求失敗后可能會重新發送失敗的請求,其目的就是防止重發過快造成服務端壓力過大。默認100 ms。
- apiVersions:ApicVersions類對象,保存了每個node所支持的api版本。
- inFlightBatches:正在執行發送相關的消息批次集合, key為分區,value是 list<|ProducerBatch>。
2、關鍵方法
Sender 類的方法也不少,這里針對關鍵方法逐一講解下。
(1)run()
Sender 線程實現了 Runnable 接口,會不斷的調用 runOnce(),這是一個典型的循環事件機制。
/**
* The main run loop for the sender thread
*/
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// 這里使用 volatile boolean 類型的變量 running,判斷 Sender 線程是否在運行狀態中。
// 1. 如果 Sender 線程在運行狀態即 running=true,則一直循環調用 runOnce() 方法。
while (running) {
try {
// 將緩沖區的消息發送到 broker。
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// 2. 如果(沒有強制關閉 && ((消息累加器中還有剩余消息待發送 || 還有等待未響應的消息 ) || 還有事務請求未完成)),則繼續發送剩下的消息。
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
// 繼續執行將剩余的消息發送完畢
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 3. 對進行中的事務進行中斷,則繼續發送剩下的消息。
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 4. 如果強制關閉,則關閉事務管理器、終止消息的追加并清空未完成的批次
if (forceClose) {
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
// 關閉事務管理器
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
// 終止消息的追加并清空未完成的批次
this.accumulator.abortIncompleteBatches();
}
// 5. 關閉 NetworkClient
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
當 Sender 線程啟動后會直接運行 run() 方法,該方法在 4種情況下會一直循環調用去發送消息到 Broker。
(2)runOnce()
我們來看看這個執行業務處理的方法,關于事務的部分后續專門文章講解。
/**
* Run a single iteration of sending
*/
void runOnce() {
if (transactionManager != null) {
... //事務處理方法 后續文章專門講解
}
// 1. 獲取當前時間的時間戳。
long currentTimeMs = time.milliseconds();
// 2. 調用 sendProducerData 發送消息,但并非真正的發送,而是把消息緩存在 把消息緩存在 KafkaChannel 的 Send 字段里。下一篇會講解 NetworkClient。
long pollTimeout = sendProducerData(currentTimeMs);
// 3. 讀取消息實現真正的網絡發送
client.poll(pollTimeout, currentTimeMs);
}
該方法比較簡單,主要做了3件事情:
- 獲取當前時間的時間戳。
- 調用 sendProducerData 發送消息,但并非真正的發送,而是把消息緩存在 NetworkClient 的 Send 字段里。下一篇會講解 NetworkClient。
- 讀取 NetworkClient 的 send 字段消息實現真正的網絡發送。
(3)sendProducerData()
該方法主要是獲取已經準備好的節點上的批次數據并過濾過期的批次集合,最后暫存消息。
private long sendProducerData(long now) {
// 1. 獲取元數據
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 2. 獲取已經準備好的節點以及找不到 Leader 分區對應的節點的主題
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 3. 如果主題 Leader 分區對應的節點不存在,則強制更新元數據
if (!result.unknownLeaderTopics.isEmpty()) {
// 添加 topic 到沒有拉取到元數據的 topic 集合中,并標識需要更新元數據
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
...
// 針對這個 topic 集合進行元數據更新
this.metadata.requestUpdate();
}
// 4. 循環 readyNodes 并檢查客戶端與要發送節點的網絡是否已經建立好了
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 檢查客戶端與要發送節點的網絡是否已經建立好了
if (!this.client.ready(node, now)) {
// 移除對應節點
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// create produce requests
// 5. 獲取上面返回的已經準備好的節點上要發送的 ProducerBatch 集合
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// 6. 將從消息累加器中讀取的數據集,放入正在執行發送相關的消息批次集合中
addToInflightBatches(batches);
// 7. 要保證消息的順序性,即參數 max.in.flight.requests.per.cnotallow=1
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
// 對 tp 進行 mute,保證只有一個 batch 正在發送
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 重置下一批次的過期時間
accumulator.resetNextBatchExpiryTime();
// 8. 從正在執行發送數據集合 inflightBatches 中獲取過期集合
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
// 9. 從 batches 中獲取過期集合
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
// 10. 從 inflightBatches 與 batches 中查找已過期的消息批次(ProducerBatch),判斷批次是否過 期是根據系統當前時間與 ProducerBatch 創建時間之差是否超過120s,過期時間可以通過參數 delivery.timeout.ms 設置。
expiredBatches.addAll(expiredInflightBatches);
// 如果過期批次不為空 則輸出對應日志
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
// 11. 處理已過期的消息批次,通知該批消息發送失敗并返回給客戶端
for (ProducerBatch expiredBatch : expiredBatches) {
// 處理當前過期ProducerBatch的回調結果 ProduceRequestResult,并且設置超時異常 new TimeoutException(errorMessage)
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
// 通知該批消息發送失敗并返回給客戶端
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
// ... 事務管理器的處理忽略
}
// 收集統計指標,后續會專門對 Kafka 的 Metrics 進行分析
sensors.updateProduceRequestMetrics(batches);
// 設置下一次的發送延時
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
// 12. 發送消息暫存到 NetworkClient send 字段里。
sendProduceRequests(batches, now);
return pollTimeout;
}
該方法主要做了12件事情,逐一說明下:
- 首先獲取元數據,這里主要是根據元數據的更新機制來保證數據的準確性。
- 獲取已經準備好的節點。accumulator#reay() 方法會通過發送記錄對應的節點和元數據進行比較,返回結果中包括兩個重要的集合:「準備好發送的節點集合 readyNodes」、「找不到 Leader 分區對應節點的主題 unKnownLeaderTopic」。
- 如果主題 Leader 分區對應的節點不存在,則強制更新元數據。
- 循環 readyNodes 并檢查客戶端與要發送節點的網絡是否已經建立好了。在 NetworkClient 中維護了客戶端與所有節點的連接,這樣就可以通過連接的狀態判斷是否連接正常。
- 獲取上面返回的已經準備好的節點上要發送的 ProducerBatch 集合。accumulator#drain() 方法就是將 「TopicPartition」-> 「ProducerBatch 集合」的映射關系轉換成 「Node 節點」->「ProducerBatch 集合」的映射關系,如下圖所示,這樣的話按照節點方式只需要2次就完成,大大減少網絡的開銷。

- 將從消息累加器中讀取的數據集,放入正在執行發送相關的消息批次集合中。
- 要保證消息的順序性,即參數 max.in.flight.requests.per.cnotallow=1,會添加到 muted 集合,保證只有一個 batch 在發送。
- 從正在執行發送數據集合 inflightBatches 中獲取過期集合。
- 從 accumulator 累加器的 batches 中獲取過期集合。
- 從 inflightBatches 與 batches 中查找已過期的消息批次(ProducerBatch),判斷批次是否過期是根據系統當前時間與 ProducerBatch 創建時間之差是否超過120s,過期時間可以通過參數 delivery.timeout.ms 設置。
- 處理已過期的消息批次,通知該批消息發送失敗并返回給客戶端。
- 發送消息暫存到 NetworkClient send 字段里。
從上面源碼可以看出,SendProducerData 方法中調用到了 Sender 線程類中多個方法,這里就不一一講解了。
三、Sender 發送流程分析
通過前兩部分的源碼解讀和剖析,我們可以得出 Sender 線程的處理流程可以分為兩大部分:「發送請求」、「接收響應結果」。
1、發送請求
從 runOnce 方法可以得出發送請求也分兩步:「消息預發送」、「真正的網絡發送」。
void runOnce() {
// 1. 把消息緩存在 KafkaChannel 的 Send 字段里。
long pollTimeout = sendProducerData(currentTimeMs);
// 2. 讀取消息實現真正的網絡發送
client.poll(pollTimeout, currentTimeMs);
}
2、接收響應結果
等 Sender 線程收到 Broker 端的響應結果后,會根據響應結果分情況進行處理。
3、時序圖
原文完整版在星球里面,如果感興趣可以掃文末二維碼加入。
四、總結
這里,我們一起來總結一下這篇文章的重點。
1、開篇總述消息被暫存到 Deque<ProducerBatch> 的 batches 中,等「批次已滿」或者「有新批次被創建」后,喚醒 Sender 子線程將消息批量發送給 Kafka Broker 端,從而引出「Sender 線程」。
2、帶你深度剖析了「Sender 線程」 的發送消息以及響應處理的相關方法。
3、最后帶你串聯了整個消息發送的流程,讓你有個更好的整體認知。