RocketMQ 如何保證發(fā)送消息不丟失?
在 RocketMQ 中,有 3種簡單的消息發(fā)送方式:同步發(fā)送、異步發(fā)送和單向發(fā)送。這篇文章,我們將詳細(xì)分析這三種發(fā)送方式的原理、優(yōu)缺點(diǎn)、使用場景以及使用該方式是否會丟失數(shù)據(jù)。
本文源碼基于: Apache RocketMQ release-5.2.0
一、同步發(fā)送
1.原理分析
在同步發(fā)送模式下,RocketMQ 默認(rèn)采用同步刷盤方式,當(dāng)生產(chǎn)者將消息發(fā)送到 Broker 后,會等待 Broker 的響應(yīng)(默認(rèn)超時 5分鐘),Broker 接收消息后,會將其寫入內(nèi)存緩存,并進(jìn)行刷盤操作。因此,如果 Broker 響應(yīng)成功,代表消息一定成功寫入磁盤。
同步發(fā)送主要涉及以下幾個步驟:
- 創(chuàng)建Producer:創(chuàng)建一個Producer對象;
- 創(chuàng)建消息:創(chuàng)建一個Message對象,設(shè)置Topic、Tag標(biāo)簽和消息體;
- 發(fā)送消息:調(diào)用DefaultMQProducer的send方法;
- 等待響應(yīng):發(fā)送方會阻塞等待服務(wù)器的響應(yīng),直到收到確認(rèn)消息;
如下示例代碼為一個完整的同步發(fā)送流程:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
public class SyncProducerTest {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建 producer,設(shè)置組名為 SyncGroupTest
DefaultMQProducer producer = new DefaultMQProducer("SyncGroup");
// 2、指定 NameServer的地址,以獲取 Broker路由地址
producer.setNamesrvAddr("x.x.x.x:9876");
// 3、啟動 producer
producer.start();
// 4、創(chuàng)建消息,并指定 Topic,Tag和消息體
Message msg = new Message("SyncTopic", "sync", "SyncMessage".getBytes("UTF-8"));
// 5、發(fā)送同步消息
SendResult sendResult = producer.send(msg);
// 6、通過 sendResult 判斷消息是否成功送達(dá)
System.out.printf("message send result:" + sendResult);
// 7、關(guān)閉 Producer
producer.shutdown();
}
}
RocketMQ 的同步發(fā)送主要涉及以下幾個關(guān)鍵源碼類和方法:
- DefaultMQProducer:生產(chǎn)者類,負(fù)責(zé)發(fā)送消息。
- MQClientAPIImpl#sendMessage:底層消息發(fā)送實(shí)現(xiàn)。
- NettyRemotingClient#invokeSync:通過 Netty 實(shí)現(xiàn)網(wǎng)絡(luò)通信。
- Broker 端的 SendMessageProcessor:處理發(fā)送請求。
源碼參考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg)
2.優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
- 簡單易用。
- 可靠性高,發(fā)送方可以確認(rèn)消息是否成功發(fā)送,一旦發(fā)送成功,消息就已經(jīng)寫入磁盤,消息不會丟失。
缺點(diǎn):
- 延遲較高,需要等待服務(wù)器的響應(yīng)。
- 吞吐量可能受限于網(wǎng)絡(luò)延遲和服務(wù)器性能。
3.使用場景
適用于對消息可靠性要求較高的場景,如訂單系統(tǒng)、金融交易、重要的消息通知等。
二、異步發(fā)送
1.原理分析
在異步發(fā)送模式下,RocketMQ 默認(rèn)采用異步刷盤方式,當(dāng)生產(chǎn)者發(fā)送消息到 Broker 后,消息寫入內(nèi)存緩存成功后,Broker 立即返回響應(yīng)(默認(rèn)超時 5分鐘),后臺線程再異步將消息批量寫入磁盤。因此,這種方式提高了系統(tǒng)的吞吐量和性能,但在系統(tǒng)崩潰時可能會丟失部分未刷盤的消息。
異步發(fā)送主要涉及以下幾個步驟:
- 創(chuàng)建Producer:創(chuàng)建一個Producer對象;
- 創(chuàng)建消息:同樣創(chuàng)建一個Message對象。
- 發(fā)送消息:調(diào)用DefaultMQProducer的send方法,傳遞一個SendCallback回調(diào)對象。
- 處理響應(yīng):回調(diào)函數(shù)會在消息發(fā)送成功或失敗時被調(diào)用。
如下示例代碼為一個完整的異步發(fā)送流程:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducerTest {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建 producer,設(shè)置組名為 AsyncGroupTest
DefaultMQProducer producer = new DefaultMQProducer("AsyncGroup");
// 2、指定 NameServer的地址,以獲取 Broker路由地址
producer.setNamesrvAddr("x.x.x.x:9876");
// 3、啟動 producer
producer.start();
// 4、創(chuàng)建消息,并指定Topic,Tag和消息體
Message msg = new Message("AsyncTopic","async", "AsyncMessage".getBytes("UTF-8"));
// 5、發(fā)送異步消息,SendCallback是處理異步回調(diào)的方法
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) { // 成功回調(diào)
System.out.println("message send success: " + sendResult);
}
@Override
public void onException(Throwable throwable) { // 失敗回調(diào)
System.out.println("message send fail: " + throwable);
}
});
// 6、關(guān)閉 Producer
producer.shutdown();
}
}
RocketMQ 的異步發(fā)送主要涉及以下幾個關(guān)鍵源碼類和方法:
- DefaultMQProducer:生產(chǎn)者類,負(fù)責(zé)發(fā)送消息。
- MQClientAPIImpl#sendMessage:底層消息發(fā)送實(shí)現(xiàn)。
- NettyRemotingClient#invokeAsync:通過 Netty 實(shí)現(xiàn)網(wǎng)絡(luò)通信。
- Broker 端的 SendMessageProcessor:處理發(fā)送請求。
源碼參考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg, SendCallback sendCallback)
2.優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
- 非阻塞,發(fā)送方可以繼續(xù)執(zhí)行其他任務(wù),提高吞吐量。
- 延遲較低,適用于對響應(yīng)時間敏感的場景。
缺點(diǎn):
- 實(shí)現(xiàn)復(fù)雜度較高,需要處理異步回調(diào)。
- 可靠性相對降低,需要處理失敗重試等問題。
- 無法保證發(fā)送出去的數(shù)據(jù)不丟失。
3.使用場景
適用于對響應(yīng)時間要求較高的場景,如實(shí)時數(shù)據(jù)處理、日志采集、消費(fèi)信息的推送等。
三、單向發(fā)送
1.原理分析
單向(OneWay)發(fā)送是一種只負(fù)責(zé)發(fā)送消息而不等待任何響應(yīng)的方式。生產(chǎn)者將消息發(fā)送到 Broker 后(默認(rèn)超時 5分鐘),不關(guān)心消息是否成功到達(dá)或被持久化,主要依賴 Broker 進(jìn)行刷盤操作,單向發(fā)送通常與異步刷盤結(jié)合使用,以提高發(fā)送效率。
單向發(fā)送主要涉及以下幾個步驟:
- 創(chuàng)建Producer:創(chuàng)建一個Producer對象;
- 創(chuàng)建消息:創(chuàng)建一個Message對象。
- 發(fā)送消息:調(diào)用DefaultMQProducer的sendOneway方法。
如下示例代碼為一個完整的單向發(fā)送流程:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
public class OneWayProducerTest {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建 producer,設(shè)置組名為 OneWayGroupTest
DefaultMQProducer producer = new DefaultMQProducer("OneWayGroup");
// 2、指定 NameServer的地址,以獲取 Broker路由地址
producer.setNamesrvAddr("x.x.x.x:9876");
// 3、啟動 producer
producer.start();
// 4、創(chuàng)建消息,并指定Topic,Tag和消息體
Message msg = new Message("OneWayTopic","oneway", "OneWayMessage".getBytes("UTF-8"));
// 5、發(fā)送單向消息
producer.sendOneway(msg);
// 6、關(guān)閉 Producer
producer.shutdown();
}
}
RocketMQ 的單向發(fā)送主要涉及以下幾個關(guān)鍵類和方法:
- DefaultMQProducer:生產(chǎn)者類,負(fù)責(zé)發(fā)送消息。
- MQClientAPIImpl#sendMessage:底層消息發(fā)送實(shí)現(xiàn)。
- NettyRemotingClient#invokeOneway:通過 Netty 實(shí)現(xiàn)網(wǎng)絡(luò)通信。
- Broker 端的 SendMessageProcessor:處理發(fā)送請求。
源碼參考:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendOneway(Message msg)
2.優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
- 非常高效,延遲最低。
- 適用于對可靠性要求不高的場景。
缺點(diǎn):
- 無法確認(rèn)消息是否成功發(fā)送。
- 可靠性最低,消息可能丟失。
3.使用場景
適用于對可靠性要求不高的場景,如日志收集、監(jiān)控數(shù)據(jù)上報等。
三種方式對比
發(fā)送方式 | 優(yōu)點(diǎn) | 缺點(diǎn) | 使用場景 |
同步發(fā)送 | 可靠性高,簡單易用 | 延遲較高,吞吐量受限 | 訂單系統(tǒng)、金融交易、重要的消息通知等 |
異步發(fā)送 | 非阻塞,延遲較低 | 實(shí)現(xiàn)復(fù)雜度高,可靠性相對降低 | 實(shí)時數(shù)據(jù)處理、日志采集、消費(fèi)信息的推送等 |
單向發(fā)送 | 高效,延遲最低 | 無法確認(rèn)消息是否成功發(fā)送,可靠性最低 | 日志收集、監(jiān)控數(shù)據(jù)上報等 |
如何選擇?
- 同步發(fā)送:消息發(fā)送后會等待服務(wù)器的響應(yīng),整個過程業(yè)務(wù)是阻塞等待的,適用于對可靠性要求高的場景,比如 訂單系統(tǒng)、金融交易等。
- 異步發(fā)送:消息發(fā)送后,不等待服務(wù)器響應(yīng),而是通過回調(diào)函數(shù)處理響應(yīng),適用于對響應(yīng)時間要求高的場景,比如實(shí)時數(shù)據(jù)處理、日志采集、消費(fèi)信息的推送等
- 單向發(fā)送::單向發(fā)送只負(fù)責(zé)發(fā)送消息而不等待任何響應(yīng)的方式,也不需要對發(fā)送的狀態(tài)、結(jié)果負(fù)責(zé),適用于對可靠性要求不高的場景,比如日志收集、監(jiān)控數(shù)據(jù)上報等。
每種發(fā)送方式都有其適用的場景和優(yōu)缺點(diǎn),具體如何選擇,一定需要根據(jù)業(yè)務(wù)需求進(jìn)行權(quán)衡。
總結(jié)
本文分析了 RocketMQ 同步發(fā)送、異步發(fā)送和單向發(fā)送三種方式的原理、優(yōu)缺點(diǎn)以及使用場景,并且分析了每種方式涉及到的核心源碼。
通過上文的介紹可以知道同步發(fā)送方式可以保證消息發(fā)送時不丟,但是性能相對其他兩種方式差一些。
RocketMQ 是一款優(yōu)秀的開源消息中間件,作為 Java程序員,建議多去閱讀它的源碼,吸收其中比較好的代碼思維。