成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

消息隊列批量收發消息,請避開這五個坑!

開發 架構
使用批量消息,在一定程度上可以提高性能和吞吐量,但是確實也會存在一些問題,使用的時候要結合業務場景避開這些坑。

大家好,我是君哥。

使用消息隊列時,為了提高生產和消費的性能,有時會開啟批量處理。

在生產端,生產者發送的消息先發送到一個消息列表,積累到一定的消息量之后再批量發送給 Broker,如下圖:

在消費端,消費者拉取消息后先不立即處理,而是把消息轉存到一個內存隊列或數據庫,由業務線程去處理,如下圖:

無論是生產者做批量發送,還是消費者做批量處理,都需要考慮使用批量消息的業務場景,避免踩坑。下面看一下批量操作可能會遇到哪些坑。

批量大小

當生產者采用批量發送的方式來提高發送性能時,一定要考慮發送消息的批量大小。下面是 RocketMQ 批量發送的官方示例:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}

RocketMQ 默認消息大小是 4M,由 maxMessageSize 參數控制,如果批量消息大小超過 maxMessageSize,則會拋出異常。

如果遇到消息大小超過 maxMessageSize 的情況時,可以用下面方法進行處理:

  • 把這個參數改大,但需要考慮 Broker 的性能和網絡帶寬;
  • 將消息進行拆分后分批發送;
  • 對消息進行壓縮處理。

RabbitMQ 相關的 API 則提供了更加靈活的批量控制,對消息數量和消息大小都做了控制,下面看一下源碼:

冪等

消費端可以批量拉取消息進行消費,這樣可以減少拉取消息時的 RPC 次數,提升消費性能。比如在 RocketMQ 中,可以通過 Consumer 中的 pullBatchSize 來設置一次拉取的消息數量,通過 consumeMessageBatchMaxSize 參數來設置一次消費的消息數量。

但需要注意的是,如果批量消息中一條消息消費失敗了,這一批消息都需要進行重試,已經消費成功的消息會被重復消費,帶來業務問題。

為了不對業務造成影響,必須考慮冪等。一個簡單的方法是在消息中增加全局唯一 id 屬性,對消息消費結果進行記錄,消費成功后保存 id。這樣在消費消息之前先查詢是否存在消費成功的記錄,如果存在則直接返回處理成功。

時延

在使用消息隊列進行批量操作時,必須要考慮到時延問題。比如我們設置一個批次 100 條消息,積累夠 100 條消息后再發送,在消息量小的情況下,可能積累夠 100 條消息會很長時間,導致消費端拉取到一條消息時延很大。

雖然消息隊列的一個重要作用是削峰填谷,但在一些場景下,對消息的實時性也有要求。比如在車聯網的充電場景,車聯網平臺需要實時感知充電樁的狀態,如果充電樁積累夠一批消息再上報平臺,平臺獲取到的狀態會不準確,如果心跳消息延時太久,平臺會認為充電樁離線。

對于有時延要求又需要批量操作的場景,可以設置一個超時時間,超時后即使消息數量不夠,也會發送出去。看下 RabbitMQ 的處理:

public synchronized void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
  throws AmqpException {
 if (correlationData != null) {
  //...
  super.send(exchange, routingKey, message, correlationData);
 }
 else {
  if (this.scheduledTask != null) {
   this.scheduledTask.cancel(false);
  }
  MessageBatch batch = this.batchingStrategy.addToBatch(exchange, routingKey, message);
  if (batch != null) {
   super.send(batch.getExchange(), batch.getRoutingKey(), batch.getMessage(), null);
  }
  //這里獲取到超時時間,到達超時時間后使用定時器將消息發送出去
  Date next = this.batchingStrategy.nextRelease();
  if (next != null) {
   this.scheduledTask = this.scheduler.schedule((Runnable) () -> releaseBatches(), next);
  }
 }
}

可靠性

使用批處理一定要考慮可靠性的問題。

在消費端,消費者批量拉取一批消息后把消息暫存到一個內存臨時隊列,然后多線程去臨時隊列消費消息,如果服務宕機,臨時隊列中的消息會丟失。

為了避免宕機引發的損失,可以拉取一批消息后保存到數據庫,然后給 Broker 返回 ACK,之后業務代碼去數據庫查詢消息并消費,不過要考慮數據庫大事務、鎖競爭等問題。

當然,對于一些消息丟失不敏感的場景,比如日志收集之類的,可靠性這個指標是不用太關注的。

特殊場景

因為批量消息有一些復雜性,消息隊列的部分特性不支持。

事務消息

批量消息會增加消息重試的難度,所以對于事務消息,建議使用單條消息,一條消息對應一個事務。

順序消息

順序消息的實現思路一般是生產者將消息發送到同一個分區,消費者綁定這個分區并使用單線程消費這個分區的消息。如果對同一個 Topic 下的同一個分區來實現批量發送,難度會增大。所以建議順序消息使用單條消息進行發送。

延時消息

如果延時消息使用批量進行發送,這一批消息的延時時間必須相同,同時要考慮批量消息的超時時間,超時時間太大會影響延時時間的準確性,生產端實現復雜度大大增加。

總結

使用批量消息,在一定程度上可以提高性能和吞吐量,但是確實也會存在一些問題,使用的時候要結合業務場景避開這些坑。

責任編輯:姜華 來源: 君哥聊技術
相關推薦

2022-07-26 20:00:35

場景RabbitMQMQ

2020-09-14 11:50:21

SpringBootRabbitMQJava

2017-07-28 09:30:55

2017-10-11 15:08:28

消息隊列常見

2025-03-28 10:06:01

架構輪詢延時

2023-09-26 08:20:12

消息隊列RabbitMQ

2022-08-22 08:45:57

Kafka網絡層源碼實現

2015-08-12 10:10:21

2020-10-09 15:00:56

實時消息編程語言

2025-03-28 12:20:00

代碼C#異步編程

2020-10-10 12:46:17

編程指南誤區

2016-08-24 15:43:01

2019-11-19 08:35:09

數據數據準備自動化

2019-07-19 07:56:13

消息隊列消息代理消息中間件

2021-02-19 09:19:11

消息隊列場景

2009-11-09 11:15:06

WCF消息隊列

2010-04-13 17:00:43

Unix消息隊列

2010-04-21 12:12:56

Unix 消息隊列

2012-09-24 11:48:05

IBMdw

2025-04-09 08:20:00

RocketMQ消息隊列開發
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日本一区二区三区四区 | 99国产精品久久久久老师 | 国产一区二区三区四区 | 精品久 | 成人免费福利视频 | 欧美一二区 | 91视频在线| 欧美亚洲综合久久 | 成人av电影网 | 天天色影视综合 | 色天天综合 | 亚洲人成人一区二区在线观看 | 亚洲一区二区三区免费在线观看 | 亚洲精品乱码久久久久久按摩观 | 国产精品一区二区精品 | 美女福利视频一区 | av中文字幕在线 | 久久久夜| 国产精品乱码一区二三区小蝌蚪 | 精品二区 | 欧美亚洲一区二区三区 | 亚洲不卡在线视频 | 日韩精品一区二区三区在线 | 欧美亚洲激情 | 天堂一区在线观看 | 欧美一级免费看 | 国产精品久久久久久久久大全 | 在线观看免费av网 | 神马影院一区二区三区 | 91av导航 | 国产精品国产精品国产专区不卡 | 精品国产乱码久久久久久丨区2区 | 中文字幕亚洲专区 | 国产精品性做久久久久久 | 成人久久久久久久久 | 操操日 | 99久久久无码国产精品 | 亚洲天堂精品久久 | 精品无码久久久久久国产 | 日本色综合 | 国产不卡一 |