美團(tuán)面試:對(duì)比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常見(jiàn)問(wèn)題?
三大MQ指標(biāo)對(duì)比
分布式、微服務(wù)、高并發(fā)架構(gòu)中,消息隊(duì)列(Message Queue,簡(jiǎn)稱MQ)扮演著至關(guān)重要的角色。
消息隊(duì)列用于實(shí)現(xiàn)系統(tǒng)間的異步通信、解耦、削峰填谷等功能。
對(duì)比指標(biāo) | RabbitMQ | RocketMQ | Kafka |
應(yīng)用場(chǎng)景 | 中小規(guī)模應(yīng)用場(chǎng)景 | 分布式事務(wù)、實(shí)時(shí)日志處理 | 大規(guī)模數(shù)據(jù)處理、實(shí)時(shí)流處理 |
開(kāi)發(fā)語(yǔ)言 | Erlang | Java | Scala & Java |
消息可靠性 | 最高 (AMQP協(xié)議保證) | 較高 (基于事務(wù)保證) | 中等 (基于副本機(jī)制保證) |
消息吞吐量 | 低 萬(wàn)級(jí)到十萬(wàn)級(jí) | 中等 十萬(wàn)級(jí)到百萬(wàn)級(jí) | 高 百萬(wàn)級(jí)或更高 |
時(shí)效性 | 毫秒級(jí) | 毫秒級(jí) | 毫秒級(jí) |
支持的語(yǔ)言和平臺(tái) | Java、C++、Python等 | Java、C++、Go等 | Java、Scala、Python等 |
架構(gòu)模型 | virtual host、broker、exchange、queue | nameserver、controller、broker | broker、topic、partition、zookeeper/Kraft |
社區(qū)活躍度和生態(tài)建設(shè) | 中等 活躍的開(kāi)源社區(qū)和豐富的插件生態(tài)系統(tǒng) | 較高 阿里巴巴開(kāi)源,穩(wěn)定的社區(qū)支持 | 最高 活躍的開(kāi)源社區(qū)和廣泛的應(yīng)用 |
github star | 10.8k | 19.4k | 25.2k |
對(duì)比分析三大MQ常見(jiàn)問(wèn)題
下面, 對(duì)比分析三大MQ常見(jiàn)問(wèn)題。
消息丟失問(wèn)題
image-20250508192254071
1、RocketMQ解決消息丟失問(wèn)題
生產(chǎn)端: 采用同步發(fā)送(等待Broker確認(rèn))并啟用重試機(jī)制,結(jié)合事務(wù)消息(如預(yù)提交half消息+二次確認(rèn)commit)確保消息可靠投遞。
Broker端:配置同步刷盤(pán)(消息寫(xiě)入磁盤(pán)后返回確認(rèn))和多副本同步機(jī)制(主從節(jié)點(diǎn)數(shù)據(jù)冗余)防止宕機(jī)丟失,同時(shí)通過(guò)集群容災(zāi)保障高可用。
消費(fèi)端:消費(fèi)者需手動(dòng)ACK確認(rèn),失敗時(shí)觸發(fā)自動(dòng)重試(默認(rèn)16次),最終失敗消息轉(zhuǎn)入死信隊(duì)列人工處理,避免異常場(chǎng)景下消息丟失。
2、Kafka解決消息丟失問(wèn)題
生產(chǎn)端:設(shè)置acks=all
確保消息被所有副本持久化后才響應(yīng),啟用生產(chǎn)者重試(retries
)及冪等性(enable.idempotence=true
)防止網(wǎng)絡(luò)抖動(dòng)或Broker異常導(dǎo)致丟失
Broker端:配置多副本同步(min.insync.replicas≥2
)和ISR(In-Sync Replicas)機(jī)制,僅同步成功的副本參與選舉;避免unclean.leader.election.enable=true
(防止數(shù)據(jù)不全的副本成為L(zhǎng)eader)
消費(fèi)端:關(guān)閉自動(dòng)提交位移(enable.auto.commit=false
),手動(dòng)同步提交(commitSync
)確保消息處理完成后再更新位移,結(jié)合消費(fèi)重試及死信隊(duì)列兜底
3、RabbitMQ解決消息丟失問(wèn)題
生產(chǎn)端:?jiǎn)⒂肞ublisher Confirm模式(異步確認(rèn)消息持久化)并設(shè)置mandatory=true
路由失敗回退,結(jié)合備份交換機(jī)處理無(wú)法路由的消息;事務(wù)消息因性能損耗僅限關(guān)鍵場(chǎng)景使用。
Broker端:消息與隊(duì)列均需持久化(durable=true
)防止宕機(jī)丟失,部署鏡像隊(duì)列集群實(shí)現(xiàn)多節(jié)點(diǎn)冗余;同步刷盤(pán)策略確保數(shù)據(jù)落盤(pán)后響應(yīng)。
消費(fèi)端:關(guān)閉自動(dòng)ACK,采用手動(dòng)ACK并在業(yè)務(wù)處理成功后提交確認(rèn);消費(fèi)失敗時(shí)重試(重試次數(shù)可配置)并最終轉(zhuǎn)入死信隊(duì)列人工干預(yù),避免消息因異常未處理而丟失。
消息積壓?jiǎn)栴}
1、RocketMQ解決消息積壓?jiǎn)栴}
RocketMQ通過(guò)橫向擴(kuò)展(增加消費(fèi)者實(shí)例、隊(duì)列數(shù)量)、提升消費(fèi)能力(線程池調(diào)優(yōu)、批量消費(fèi))、動(dòng)態(tài)擴(kuò)容、消息預(yù)取、死信隊(duì)列隔離無(wú)效消息,并支持消費(fèi)限流及監(jiān)控告警,快速定位處理積壓?jiǎn)栴}。
RocketMQ還提供了消息拉取和推拉模式,消費(fèi)者可以根據(jù)自身的處理能力主動(dòng)拉取消息,避免消息積壓過(guò)多。
2、Kafka解決消息積壓?jiǎn)栴}
Kafka通過(guò) 橫向擴(kuò)展(增加分區(qū)及消費(fèi)者實(shí)例)、優(yōu)化消費(fèi)者參數(shù)(如批量拉取、并發(fā)處理)、提升消費(fèi)邏輯效率(異步化、減少I(mǎi)/O),并動(dòng)態(tài)監(jiān)控消費(fèi)滯后指標(biāo)。
必要時(shí)限流生產(chǎn)者或臨時(shí)擴(kuò)容消費(fèi)組,結(jié)合分區(qū)再平衡策略快速分發(fā)積壓消息負(fù)載。
Kafka還提供了消息清理(compaction)和數(shù)據(jù)保留策略,可以根據(jù)時(shí)間或者數(shù)據(jù)大小來(lái)自動(dòng)刪除過(guò)期的消息,避免消息積壓過(guò)多。
3、RabbitMQ解決消息積壓?jiǎn)栴}
RabbitMQ通過(guò)調(diào)整消費(fèi)者的消費(fèi)速率來(lái)控制消息積壓。
可以使用QoS(Quality of Service)機(jī)制設(shè)置每個(gè)消費(fèi)者的預(yù)取計(jì)數(shù),限制每次從隊(duì)列中獲取的消息數(shù)量,以控制消費(fèi)者的處理速度。
RabbitMQ還支持消費(fèi)者端的流量控制,通過(guò)設(shè)置basic.qos或basic.consume命令的參數(shù)來(lái)控制消費(fèi)者的處理速度,避免消息過(guò)多導(dǎo)致積壓。
消息重復(fù)消費(fèi)問(wèn)題
1、RocketMQ解決消息重復(fù)消費(fèi)問(wèn)題
- 使用消息唯一標(biāo)識(shí)符(Message ID):在消息發(fā)送時(shí),為每條消息附加一個(gè)唯一標(biāo)識(shí)符。消費(fèi)者在處理消息時(shí),可以通過(guò)判斷消息唯一標(biāo)識(shí)符來(lái)避免重復(fù)消費(fèi)。可以將消息ID記錄在數(shù)據(jù)庫(kù)或緩存中,用于去重檢查。
- 消費(fèi)者端去重處理:消費(fèi)者在消費(fèi)消息時(shí),可以通過(guò)維護(hù)一個(gè)已消費(fèi)消息的列表或緩存,來(lái)避免重復(fù)消費(fèi)已經(jīng)處理過(guò)的消息。
2、Kafka解決消息重復(fù)消費(fèi)問(wèn)題
- 冪等性處理:在消費(fèi)者端實(shí)現(xiàn)冪等性邏輯,即多次消費(fèi)同一條消息所產(chǎn)生的結(jié)果與單次消費(fèi)的結(jié)果一致。這可以通過(guò)在業(yè)務(wù)邏輯中引入唯一標(biāo)識(shí)符或記錄已處理消息的狀態(tài)來(lái)實(shí)現(xiàn)。
- 消息確認(rèn)機(jī)制:消費(fèi)者在處理完消息后,提交已消費(fèi)的偏移量(Offset)給Kafka,Kafka會(huì)記錄已提交的偏移量,以便在消費(fèi)者重新啟動(dòng)時(shí)從正確的位置繼續(xù)消費(fèi)。消費(fèi)者可以定期提交偏移量,確保消息只被消費(fèi)一次。
3、RabbitMQ解決消息重復(fù)消費(fèi)問(wèn)題
- 冪等性處理:在消費(fèi)者端實(shí)現(xiàn)冪等性邏輯,即無(wú)論消息被消費(fèi)多少次,最終的結(jié)果應(yīng)該保持一致。這可以通過(guò)在消費(fèi)端進(jìn)行唯一標(biāo)識(shí)的檢查或者記錄已經(jīng)處理過(guò)的消息來(lái)實(shí)現(xiàn)。
- 消息確認(rèn)機(jī)制:消費(fèi)者在處理完消息后,發(fā)送確認(rèn)消息(ACK)給RabbitMQ,告知消息已經(jīng)成功處理。RabbitMQ根據(jù)接收到的確認(rèn)消息來(lái)判斷是否需要重新投遞消息給其他消費(fèi)者。
最為詳細(xì)的方案,請(qǐng)參考尼恩團(tuán)隊(duì)的架構(gòu)方案: 最系統(tǒng)的冪等性方案:一鎖二判三更新
消息有序性
1、Rabbitmq 解決有序性問(wèn)題
模式一:?jiǎn)侮?duì)列單消費(fèi)者模式
- 將需要保證順序的消息全部發(fā)送到同一個(gè)隊(duì)列,且消費(fèi)者設(shè)置為單線程處理。
- 原理:RabbitMQ 隊(duì)列天然支持 FIFO 順序存儲(chǔ),單消費(fèi)者避免并發(fā)處理導(dǎo)致亂序。
示例代:
// 生產(chǎn)者發(fā)送到同一隊(duì)列
rabbitTemplate.convertAndSend("order.queue", "message1");
rabbitTemplate.convertAndSend("order.queue", "message2");
// 消費(fèi)者單線程監(jiān)聽(tīng)
@RabbitListener(queues = "order.queue")
public void processOrder(String message) {
// 順序處理邏輯
}
缺點(diǎn):無(wú)法橫向擴(kuò)展消費(fèi)者,吞吐量受限。
模式二:消息分組策略
按業(yè)務(wù)標(biāo)識(shí)分區(qū)(如訂單 ID、用戶 ID),相同分組的消息路由到同一隊(duì)列,每個(gè)隊(duì)列對(duì)應(yīng)一個(gè)消費(fèi)者。
實(shí)現(xiàn)方式: 生產(chǎn)者通過(guò)哈希算法或自定義路由鍵將關(guān)聯(lián)的消息分配到特定隊(duì)列。
- 生產(chǎn)者根據(jù)業(yè)務(wù)標(biāo)識(shí)生成路由鍵,如
routingKey = orderId.hashCode() % queueCount
。 - 聲明多個(gè)隊(duì)列,綁定到同一交換機(jī),并根據(jù)路由鍵規(guī)則分發(fā)消息。
代碼示例:
// 生產(chǎn)者發(fā)送消息時(shí)指定路由鍵
String orderId = "ORDER_1001";
String routingKey = "order." + (orderId.hashCode() % 3); // 分配到3個(gè)隊(duì)列之一
rabbitTemplate.convertAndSend("order.exchange", routingKey, message);
優(yōu)勢(shì):在保證同分組順序性的同時(shí),允許不同分組并行處理。
消費(fèi)者并發(fā)控制 設(shè)置
prefetchCount=1
確保每次只處理一個(gè)消息,關(guān)閉自動(dòng)應(yīng)答,手動(dòng)確認(rèn)后再獲取新消息:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
效果:防止消費(fèi)者同時(shí)處理多個(gè)消息導(dǎo)致亂序。
2、RocketMQ解決有序性問(wèn)題
RocketMQ實(shí)現(xiàn)順序消息的核心是通過(guò)生產(chǎn)端和消費(fèi)端雙重保障:
- 全局順序需單隊(duì)列(性能受限),分區(qū)順序通過(guò)Sharding Key哈希分散到不同隊(duì)列,兼顧吞吐量與局部有序性。需避免異步消費(fèi)、消息重試亂序,失敗時(shí)跳過(guò)當(dāng)前消息防止阻塞
- 生產(chǎn)者使用MessageQueueSelector將同一業(yè)務(wù)標(biāo)識(shí)(如訂單ID)的消息強(qiáng)制路由至同一隊(duì)列,利用隊(duì)列FIFO特性保序;
- 消費(fèi)端對(duì) 同一隊(duì)列啟用 單線程拉取 + 分區(qū)鎖機(jī)制(ConsumeOrderlyContext),確保串行處理。
3、Kafka解決有序性問(wèn)題
Kafka實(shí)現(xiàn)順序消息的核心在于分區(qū)順序性:
- 生產(chǎn)端:相同業(yè)務(wù)標(biāo)識(shí)(如訂單ID)的消息通過(guò)固定Key哈希至同一分區(qū)(
Partitioner
),利用分區(qū)內(nèi)消息天然有序性保序; - 消費(fèi)端:每個(gè)分區(qū)僅由同一消費(fèi)者組的一個(gè)線程消費(fèi)(單線程串行處理),避免并發(fā)消費(fèi)亂序;
事務(wù)消息
1、RabbitMQ的事務(wù)消息
- RabbitMQ支持事務(wù)消息的發(fā)送和確認(rèn)。在發(fā)送消息之前,可以通過(guò)調(diào)用"channel.txSelect()"來(lái)開(kāi)啟事務(wù),然后將要發(fā)送的消息發(fā)布到交換機(jī)中。如果事務(wù)成功提交,消息將被發(fā)送到隊(duì)列,否則事務(wù)會(huì)回滾,消息不會(huì)被發(fā)送。
- 在消費(fèi)端,可以通過(guò)"channel.txSelect()"開(kāi)啟事務(wù),然后使用"basicAck"手動(dòng)確認(rèn)消息的處理結(jié)果。如果事務(wù)成功提交,消費(fèi)端會(huì)發(fā)送ACK確認(rèn)消息的處理;否則,事務(wù)回滾,消息將被重新投遞。
public class RabbitMQTransactionDemo {
private static final String QUEUE_NAME = "transaction_queue";
public static void main(String[] args) {
try {
// 創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 創(chuàng)建連接
Connection connection = factory.newConnection();
// 創(chuàng)建信道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
try {
// 開(kāi)啟事務(wù)
channel.txSelect();
// 發(fā)送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 提交事務(wù)
channel.txCommit();
} catch (Exception e) {
// 事務(wù)回滾
channel.txRollback();
e.printStackTrace();
}
// 關(guān)閉信道和連接
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、RocketMQ的事務(wù)消息
RocketMQ提供了事務(wù)消息的機(jī)制,確保消息的可靠性和一致性。
發(fā)送事務(wù)消息時(shí),需要將消息發(fā)送到半消息隊(duì)列,然后執(zhí)行本地事務(wù)邏輯。
事務(wù)執(zhí)行成功后,通過(guò)調(diào)用"TransactionStatus.CommitTransaction"提交事務(wù)消息;若事務(wù)執(zhí)行失敗,則通過(guò)調(diào)用"TransactionStatus.RollbackTransaction"回滾事務(wù)消息。
事務(wù)消息的最終狀態(tài)由消息生產(chǎn)者根據(jù)事務(wù)執(zhí)行結(jié)果進(jìn)行確認(rèn)。
public class RocketMQTransactionDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建事務(wù)消息生產(chǎn)者
TransactionMQProducer producer = new TransactionMQProducer("group_name");
producer.setNamesrvAddr("localhost:9876");
// 設(shè)置事務(wù)監(jiān)聽(tīng)器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執(zhí)行本地事務(wù)邏輯,根據(jù)業(yè)務(wù)邏輯結(jié)果返回相應(yīng)的狀態(tài)
// 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務(wù)提交
// 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務(wù)回滾
// 返回 LocalTransactionState.UNKNOW 表示事務(wù)狀態(tài)未知
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根據(jù)消息的狀態(tài),來(lái)判斷本地事務(wù)的最終狀態(tài)
// 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務(wù)提交
// 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務(wù)回滾
// 返回 LocalTransactionState.UNKNOW 表示事務(wù)狀態(tài)未知
}
});
// 啟動(dòng)事務(wù)消息生產(chǎn)者
producer.start();
// 構(gòu)造消息
Message msg = new Message("topic_name", "tag_name", "Hello, RocketMQ!".getBytes());
// 發(fā)送事務(wù)消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("Send Result: " + sendResult);
// 關(guān)閉事務(wù)消息生產(chǎn)者
producer.shutdown();
}
}
3、Kafka的事務(wù)消息
Kafka引入了事務(wù)功能來(lái)確保消息的原子性和一致性。事務(wù)消息的發(fā)送和確認(rèn)在生產(chǎn)者端進(jìn)行。
生產(chǎn)者可以通過(guò)初始化事務(wù),將一系列的消息寫(xiě)入事務(wù),然后通過(guò)"commitTransaction()"提交事務(wù),或者通過(guò)"abortTransaction()"中止事務(wù)。
Kafka會(huì)保證在事務(wù)提交之前,寫(xiě)入的所有消息不會(huì)被消費(fèi)者可見(jiàn),以保持事務(wù)的一致性。
public class KafkaTransactionDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id");
Producer<String, String> producer = new KafkaProducer<>(props);
// 初始化事務(wù)
producer.initTransactions();
try {
// 開(kāi)啟事務(wù)
producer.beginTransaction();
// 發(fā)送消息
ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "Hello, Kafka!");
producer.send(record);
// 提交事務(wù)
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 處理異常情況
producer.close();
} finally {
producer.close();
}
}
}
消息確認(rèn) ACK機(jī)制
1、RabbitMQ的ACK機(jī)制
RabbitMQ使用ACK(消息確認(rèn))機(jī)制來(lái)確保消息的可靠傳遞。
消費(fèi)者收到消息后,需要向RabbitMQ發(fā)送ACK來(lái)確認(rèn)消息的處理狀態(tài)。
只有在收到ACK后,RabbitMQ才會(huì)將消息標(biāo)記為已成功傳遞,否則會(huì)將消息重新投遞給其他消費(fèi)者或者保留在隊(duì)列中。
以下是RabbitMQ ACK的Java示例:
public class RabbitMQAckDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 創(chuàng)建連接
Connection connection = factory.newConnection();
// 創(chuàng)建信道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
String queueName = "queue_name";
channel.queueDeclare(queueName, false, false, false, null);
// 創(chuàng)建消費(fèi)者
String consumerTag = "consumer_tag";
boolean autoAck = false; // 關(guān)閉自動(dòng)ACK
// 消費(fèi)消息
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消費(fèi)消息
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
try {
// 模擬處理消息的業(yè)務(wù)邏輯
processMessage(message);
// 手動(dòng)發(fā)送ACK確認(rèn)消息
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 處理消息異常,可以選擇重試或者記錄日志等操作
System.out.println("Failed to process message: " + message);
e.printStackTrace();
// 手動(dòng)發(fā)送NACK拒絕消息,并可選是否重新投遞
long deliveryTag = envelope.getDeliveryTag();
boolean requeue = true; // 重新投遞消息
channel.basicNack(deliveryTag, false, requeue);
}
}
});
}
private static void processMessage(String message) {
// 模擬處理消息的業(yè)務(wù)邏輯
}
}
2、RocketMQ的ACK機(jī)制
RocketMQ的ACK機(jī)制由消費(fèi)者控制,消費(fèi)者從消息隊(duì)列中消費(fèi)消息后,可以手動(dòng)發(fā)送ACK確認(rèn)消息的處理狀態(tài)。
只有在收到ACK后,RocketMQ才會(huì)將消息標(biāo)記為已成功消費(fèi),否則會(huì)將消息重新投遞給其他消費(fèi)者。
public class RocketMQAckDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建消費(fèi)者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("localhost:9876");
// 訂閱消息
consumer.subscribe("topic_name", "*");
// 注冊(cè)消息監(jiān)聽(tīng)器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt message : msgs) {
try {
// 消費(fèi)消息
String msgBody = new String(message.getBody(), "UTF-8");
System.out.println("Received message: " + msgBody);
// 模擬處理消息的業(yè)務(wù)邏輯
processMessage(msgBody);
// 手動(dòng)發(fā)送ACK確認(rèn)消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 處理消息異常,可以選擇重試或者記錄日志等操作
System.out.println("Failed to process message: " + new String(message.getBody()));
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 啟動(dòng)消費(fèi)者
consumer.start();
}
private static void processMessage(String message) {
// 模擬處理消息的業(yè)務(wù)邏輯
}
}
3、Kafka的ACK機(jī)制
Kafka的ACK機(jī)制用于控制生產(chǎn)者在發(fā)送消息后,需要等待多少個(gè)副本確認(rèn)才視為消息發(fā)送成功。
這個(gè)機(jī)制可以通過(guò)設(shè)置acks參數(shù)來(lái)進(jìn)行配置。在Kafka中,acks參數(shù)有三個(gè)可選值:
acks=0:生產(chǎn)者在發(fā)送消息后不需要等待任何確認(rèn),直接將消息發(fā)送給Kafka集群。這種方式具有最高的吞吐量,但是也存在數(shù)據(jù)丟失的風(fēng)險(xiǎn),因?yàn)樯a(chǎn)者不會(huì)知道消息是否成功發(fā)送給任何副本。
acks=1:生產(chǎn)者在發(fā)送消息后只需要等待首領(lǐng)副本(leader replica)確認(rèn)。一旦首領(lǐng)副本成功接收到消息,生產(chǎn)者就會(huì)收到確認(rèn)。這種方式提供了一定的可靠性,但是如果首領(lǐng)副本在接收消息后但在確認(rèn)之前發(fā)生故障,仍然可能會(huì)導(dǎo)致數(shù)據(jù)丟失。
acks=all:生產(chǎn)者在發(fā)送消息后需要等待所有副本都確認(rèn)。只有當(dāng)所有副本都成功接收到消息后,生產(chǎn)者才會(huì)收到確認(rèn)。這是最安全的確認(rèn)機(jī)制,確保了消息不會(huì)丟失,但是需要更多的時(shí)間和資源。acks=-1與acks=all是等效的。
public classKafkaProducerDemo{
public static void main(String[]args){
// 配置Kafka生產(chǎn)者的參數(shù)
Propertiesprops=newProperties();
props.put("bootstrap.servers","localhost:9092");// Kafka集群的地址和端口
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// 鍵的序列化器
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 值的序列化器
props.put("acks","all");// 設(shè)置ACK機(jī)制為所有副本都確認(rèn)
// 創(chuàng)建生產(chǎn)者實(shí)例
KafkaProducer<String,String>producer=newKafkaProducer<>(props);
// 構(gòu)造消息
Stringtopic="my_topic";
Stringkey="my_key";
Stringvalue="Hello, Kafka!";
// 創(chuàng)建消息記錄
ProducerRecord<String,String>record=newProducerRecord<>(topic,key,value);
// 發(fā)送消息
producer.send(record,newCallback(){
@Override
publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){
if(exception!=null){
System.err.println("發(fā)送消息出現(xiàn)異常:"+exception.getMessage());
}else{
System.out.println("消息發(fā)送成功!位于分區(qū) "+metadata.partition()+",偏移量 "+metadata.offset());
}
}
});
// 關(guān)閉生產(chǎn)者
producer.close();
}
}
延遲消息實(shí)現(xiàn)
延遲隊(duì)列在實(shí)際項(xiàng)目中有非常多的應(yīng)用場(chǎng)景,最常見(jiàn)的比如訂單未支付,超時(shí)取消訂單,在創(chuàng)建訂單的時(shí)候發(fā)送一條延遲消息,達(dá)到延遲時(shí)間之后消費(fèi)者收到消息,如果訂單沒(méi)有支付的話,那么就取消訂單。
image-20250508192415995
1、RocketMQ實(shí)現(xiàn)延遲消息
RocketMQ 默認(rèn)時(shí)間間隔分為 18 個(gè)級(jí)別,基本上也能滿足大部分場(chǎng)景的需要了。
默認(rèn)延遲級(jí)別:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
使用起來(lái)也非常的簡(jiǎn)單,直接通過(guò)setDelayTimeLevel
設(shè)置延遲級(jí)別即可。
setDelayTimeLevel(level)
實(shí)現(xiàn)原理說(shuō)起來(lái)比較簡(jiǎn)單,Broker 會(huì)根據(jù)不同的延遲級(jí)別創(chuàng)建出多個(gè)不同級(jí)別的隊(duì)列,當(dāng)我們發(fā)送延遲消息的時(shí)候,根據(jù)不同的延遲級(jí)別發(fā)送到不同的隊(duì)列中,同時(shí)在 Broker 內(nèi)部通過(guò)一個(gè)定時(shí)器去輪詢這些隊(duì)列(RocketMQ 會(huì)為每個(gè)延遲級(jí)別分別創(chuàng)建一個(gè)定時(shí)任務(wù)),如果消息達(dá)到發(fā)送時(shí)間,那么就直接把消息發(fā)送到指 topic 隊(duì)列中。
RocketMQ 這種實(shí)現(xiàn)方式是放在服務(wù)端去做的,同時(shí)有個(gè)好處就是相同延遲時(shí)間的消息是可以保證有序性的。
談到這里就順便提一下關(guān)于消息消費(fèi)重試的原理,這個(gè)本質(zhì)上來(lái)說(shuō)其實(shí)是一樣的,對(duì)于消費(fèi)失敗需要重試的消息實(shí)際上都會(huì)被丟到延遲隊(duì)列的 topic 里,到期后再轉(zhuǎn)發(fā)到真正的 topic 中。
image-20250508192539070
2、RabbitMQ實(shí)現(xiàn)延遲消息
RabbitMQ本身并不存在延遲隊(duì)列的概念,在 RabbitMQ 中是通過(guò) DLX 死信交換機(jī)和 TTL 消息過(guò)期來(lái)實(shí)現(xiàn)延遲隊(duì)列的。
TTL(Time to Live)過(guò)期時(shí)間
有兩種方式可以設(shè)置 TTL。
(1) 通過(guò)隊(duì)列屬性設(shè)置,這樣的話隊(duì)列中的所有消息都會(huì)擁有相同的過(guò)期時(shí)間(2) 對(duì)消息單獨(dú)設(shè)置過(guò)期時(shí)間,這樣每條消息的過(guò)期時(shí)間都可以不同
那么如果同時(shí)設(shè)置呢?這樣將會(huì)以兩個(gè)時(shí)間中較小的值為準(zhǔn)。
針對(duì)隊(duì)列的方式通過(guò)參數(shù)x-message-ttl
來(lái)設(shè)置。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
針對(duì)消息的方式通過(guò)setExpiration
來(lái)設(shè)置。
AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());
DLX(Dead Letter Exchange)死信交換機(jī)
一個(gè)消息要成為死信消息有 3 種情況:
(1) 消息被拒絕,比如調(diào)用reject
方法,并且需要設(shè)置requeue
為false
(2) 消息過(guò)期
(3) 隊(duì)列達(dá)到最大長(zhǎng)度
可以通過(guò)參數(shù)dead-letter-exchange
設(shè)置死信交換機(jī),也可以通過(guò)參數(shù)dead-letter- exchange
指定 RoutingKey(未指定則使用原隊(duì)列的 RoutingKey)。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
實(shí)現(xiàn)原理
當(dāng)我們對(duì)消息設(shè)置了 TTL 和 DLX 之后,當(dāng)消息正常發(fā)送,通過(guò) Exchange 到達(dá) Queue 之后,由于設(shè)置了 TTL 過(guò)期時(shí)間,并且消息沒(méi)有被消費(fèi)(訂閱的是死信隊(duì)列),達(dá)到過(guò)期時(shí)間之后,消息就轉(zhuǎn)移到與之綁定的 DLX 死信隊(duì)列之中。
這樣的話,就相當(dāng)于通過(guò) DLX 和 TTL 間接實(shí)現(xiàn)了延遲消息的功能,實(shí)際使用中我們可以根據(jù)不同的延遲級(jí)別綁定設(shè)置不同延遲時(shí)間的隊(duì)列來(lái)達(dá)到實(shí)現(xiàn)不同延遲時(shí)間的效果。
如果隊(duì)列通過(guò) dead-letter-exchange 屬性指定了一個(gè)交換機(jī),那么該隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡(jiǎn)稱DLX)。
image-20250508192505250
3、Kafka實(shí)現(xiàn)延遲消息
對(duì)于 Kafka 來(lái)說(shuō),原生并不支持延遲隊(duì)列的功能,需要我們手動(dòng)去實(shí)現(xiàn),這里我根據(jù) RocketMQ 的設(shè)計(jì)提供一個(gè)實(shí)現(xiàn)思路。
這個(gè)設(shè)計(jì),我們也不支持任意時(shí)間精度的延遲消息,只支持固定級(jí)別的延遲,因?yàn)閷?duì)于大部分延遲消息的場(chǎng)景來(lái)說(shuō)足夠使用了。
只創(chuàng)建一個(gè) topic,但是針對(duì)該 topic 創(chuàng)建 18 個(gè) partition,每個(gè) partition 對(duì)應(yīng)不同的延遲級(jí)別,這樣做和 RocketMQ 一樣有個(gè)好處就是能達(dá)到相同延遲時(shí)間的消息達(dá)到有序性。
應(yīng)用級(jí) Kafka 延遲消息實(shí)現(xiàn)原理
首先創(chuàng)建一個(gè)單獨(dú)針對(duì)延遲隊(duì)列的 topic,同時(shí)創(chuàng)建 18 個(gè) partition 針對(duì)不同的延遲級(jí)別。
發(fā)送消息的時(shí)候根據(jù)延遲參數(shù)發(fā)送到延遲 topic 對(duì)應(yīng)的 partition,對(duì)應(yīng)的key
為延遲時(shí)間,同時(shí)把原 topic 保存到 header 中。
ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("delay_topic", delayPartition, delayTime, data);
producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));
內(nèi)嵌的consumer
單獨(dú)設(shè)置一個(gè)ConsumerGroup
去消費(fèi)延遲 topic 消息,消費(fèi)到消息之后如果沒(méi)有達(dá)到延遲時(shí)間那么就進(jìn)行pause
,然后seek
到當(dāng)前ConsumerRecord
的offset
位置,同時(shí)使用定時(shí)器去輪詢延遲的TopicPartition
,達(dá)到延遲時(shí)間之后進(jìn)行resume
。
如果達(dá)到了延遲時(shí)間,那么就獲取到header
中的真實(shí) topic ,直接轉(zhuǎn)發(fā)。
這里為什么要進(jìn)行pause
和resume
呢?
因?yàn)槿绻贿@樣的話,如果超時(shí)未消費(fèi)達(dá)到max.poll.interval.ms
最大時(shí)間(默認(rèn)300s),那么將會(huì)觸發(fā) Rebalance。