消息隊列批量收發消息,請避開這五個坑!
大家好,我是君哥。
使用消息隊列時,為了提高生產和消費的性能,有時會開啟批量處理。
在生產端,生產者發送的消息先發送到一個消息列表,積累到一定的消息量之后再批量發送給 Broker,如下圖:
在消費端,消費者拉取消息后先不立即處理,而是把消息轉存到一個內存隊列或數據庫,由業務線程去處理,如下圖:
無論是生產者做批量發送,還是消費者做批量處理,都需要考慮使用批量消息的業務場景,避免踩坑。下面看一下批量操作可能會遇到哪些坑。
批量大小
當生產者采用批量發送的方式來提高發送性能時,一定要考慮發送消息的批量大小。下面是 RocketMQ 批量發送的官方示例:
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
RocketMQ 默認消息大小是 4M,由 maxMessageSize 參數控制,如果批量消息大小超過 maxMessageSize,則會拋出異常。
如果遇到消息大小超過 maxMessageSize 的情況時,可以用下面方法進行處理:
- 把這個參數改大,但需要考慮 Broker 的性能和網絡帶寬;
- 將消息進行拆分后分批發送;
- 對消息進行壓縮處理。
RabbitMQ 相關的 API 則提供了更加靈活的批量控制,對消息數量和消息大小都做了控制,下面看一下源碼:
冪等
消費端可以批量拉取消息進行消費,這樣可以減少拉取消息時的 RPC 次數,提升消費性能。比如在 RocketMQ 中,可以通過 Consumer 中的 pullBatchSize 來設置一次拉取的消息數量,通過 consumeMessageBatchMaxSize 參數來設置一次消費的消息數量。
但需要注意的是,如果批量消息中一條消息消費失敗了,這一批消息都需要進行重試,已經消費成功的消息會被重復消費,帶來業務問題。
為了不對業務造成影響,必須考慮冪等。一個簡單的方法是在消息中增加全局唯一 id 屬性,對消息消費結果進行記錄,消費成功后保存 id。這樣在消費消息之前先查詢是否存在消費成功的記錄,如果存在則直接返回處理成功。
時延
在使用消息隊列進行批量操作時,必須要考慮到時延問題。比如我們設置一個批次 100 條消息,積累夠 100 條消息后再發送,在消息量小的情況下,可能積累夠 100 條消息會很長時間,導致消費端拉取到一條消息時延很大。
雖然消息隊列的一個重要作用是削峰填谷,但在一些場景下,對消息的實時性也有要求。比如在車聯網的充電場景,車聯網平臺需要實時感知充電樁的狀態,如果充電樁積累夠一批消息再上報平臺,平臺獲取到的狀態會不準確,如果心跳消息延時太久,平臺會認為充電樁離線。
對于有時延要求又需要批量操作的場景,可以設置一個超時時間,超時后即使消息數量不夠,也會發送出去。看下 RabbitMQ 的處理:
public synchronized void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
throws AmqpException {
if (correlationData != null) {
//...
super.send(exchange, routingKey, message, correlationData);
}
else {
if (this.scheduledTask != null) {
this.scheduledTask.cancel(false);
}
MessageBatch batch = this.batchingStrategy.addToBatch(exchange, routingKey, message);
if (batch != null) {
super.send(batch.getExchange(), batch.getRoutingKey(), batch.getMessage(), null);
}
//這里獲取到超時時間,到達超時時間后使用定時器將消息發送出去
Date next = this.batchingStrategy.nextRelease();
if (next != null) {
this.scheduledTask = this.scheduler.schedule((Runnable) () -> releaseBatches(), next);
}
}
}
可靠性
使用批處理一定要考慮可靠性的問題。
在消費端,消費者批量拉取一批消息后把消息暫存到一個內存臨時隊列,然后多線程去臨時隊列消費消息,如果服務宕機,臨時隊列中的消息會丟失。
為了避免宕機引發的損失,可以拉取一批消息后保存到數據庫,然后給 Broker 返回 ACK,之后業務代碼去數據庫查詢消息并消費,不過要考慮數據庫大事務、鎖競爭等問題。
當然,對于一些消息丟失不敏感的場景,比如日志收集之類的,可靠性這個指標是不用太關注的。
特殊場景
因為批量消息有一些復雜性,消息隊列的部分特性不支持。
事務消息
批量消息會增加消息重試的難度,所以對于事務消息,建議使用單條消息,一條消息對應一個事務。
順序消息
順序消息的實現思路一般是生產者將消息發送到同一個分區,消費者綁定這個分區并使用單線程消費這個分區的消息。如果對同一個 Topic 下的同一個分區來實現批量發送,難度會增大。所以建議順序消息使用單條消息進行發送。
延時消息
如果延時消息使用批量進行發送,這一批消息的延時時間必須相同,同時要考慮批量消息的超時時間,超時時間太大會影響延時時間的準確性,生產端實現復雜度大大增加。
總結
使用批量消息,在一定程度上可以提高性能和吞吐量,但是確實也會存在一些問題,使用的時候要結合業務場景避開這些坑。