成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

美團(tuán)面試:對(duì)比分析 RocketMQ、Kafka、RabbitMQ 三大MQ常見(jiàn)問(wèn)題?

開(kāi)發(fā) 架構(gòu)
RocketMQ通過(guò)橫向擴(kuò)展(增加消費(fèi)者實(shí)例、隊(duì)列數(shù)量)、提升消費(fèi)能力(線程池調(diào)優(yōu)、批量消費(fèi))、動(dòng)態(tài)擴(kuò)容、消息預(yù)取、死信隊(duì)列隔離無(wú)效消息,并支持消費(fèi)限流及監(jiān)控告警,快速定位處理積壓?jiǎn)栴}。

三大MQ指標(biāo)對(duì)比

分布式、微服務(wù)、高并發(fā)架構(gòu)中,消息隊(duì)列(Message Queue,簡(jiǎn)稱MQ)扮演著至關(guān)重要的角色。

消息隊(duì)列用于實(shí)現(xiàn)系統(tǒng)間的異步通信、解耦、削峰填谷等功能。

對(duì)比指標(biāo)

RabbitMQ

RocketMQ

Kafka

應(yīng)用場(chǎng)景

中小規(guī)模應(yīng)用場(chǎng)景

分布式事務(wù)、實(shí)時(shí)日志處理

大規(guī)模數(shù)據(jù)處理、實(shí)時(shí)流處理

開(kāi)發(fā)語(yǔ)言

Erlang

Java

Scala & Java

消息可靠性

最高 (AMQP協(xié)議保證)

較高 (基于事務(wù)保證)

中等 (基于副本機(jī)制保證)

消息吞吐量

低 萬(wàn)級(jí)到十萬(wàn)級(jí)

中等 十萬(wàn)級(jí)到百萬(wàn)級(jí)

高 百萬(wàn)級(jí)或更高

時(shí)效性

毫秒級(jí)

毫秒級(jí)

毫秒級(jí)

支持的語(yǔ)言和平臺(tái)

Java、C++、Python等

Java、C++、Go等

Java、Scala、Python等

架構(gòu)模型

virtual host、broker、exchange、queue

nameserver、controller、broker

broker、topic、partition、zookeeper/Kraft

社區(qū)活躍度和生態(tài)建設(shè)

中等 活躍的開(kāi)源社區(qū)和豐富的插件生態(tài)系統(tǒng)

較高 阿里巴巴開(kāi)源,穩(wěn)定的社區(qū)支持

最高 活躍的開(kāi)源社區(qū)和廣泛的應(yīng)用

github star

10.8k

19.4k

25.2k

對(duì)比分析三大MQ常見(jiàn)問(wèn)題

下面, 對(duì)比分析三大MQ常見(jiàn)問(wèn)題。

消息丟失問(wèn)題


image-20250508192254071image-20250508192254071

1、RocketMQ解決消息丟失問(wèn)題

生產(chǎn)端:  采用同步發(fā)送(等待Broker確認(rèn))并啟用重試機(jī)制,結(jié)合事務(wù)消息(如預(yù)提交half消息+二次確認(rèn)commit)確保消息可靠投遞。

Broker端:配置同步刷盤(pán)(消息寫(xiě)入磁盤(pán)后返回確認(rèn))和多副本同步機(jī)制(主從節(jié)點(diǎn)數(shù)據(jù)冗余)防止宕機(jī)丟失,同時(shí)通過(guò)集群容災(zāi)保障高可用。

消費(fèi)端:消費(fèi)者需手動(dòng)ACK確認(rèn),失敗時(shí)觸發(fā)自動(dòng)重試(默認(rèn)16次),最終失敗消息轉(zhuǎn)入死信隊(duì)列人工處理,避免異常場(chǎng)景下消息丟失。

2、Kafka解決消息丟失問(wèn)題

生產(chǎn)端:設(shè)置acks=all確保消息被所有副本持久化后才響應(yīng),啟用生產(chǎn)者重試(retries)及冪等性(enable.idempotence=true)防止網(wǎng)絡(luò)抖動(dòng)或Broker異常導(dǎo)致丟失

Broker端:配置多副本同步(min.insync.replicas≥2)和ISR(In-Sync Replicas)機(jī)制,僅同步成功的副本參與選舉;避免unclean.leader.election.enable=true(防止數(shù)據(jù)不全的副本成為L(zhǎng)eader)

消費(fèi)端:關(guān)閉自動(dòng)提交位移(enable.auto.commit=false),手動(dòng)同步提交(commitSync)確保消息處理完成后再更新位移,結(jié)合消費(fèi)重試及死信隊(duì)列兜底

3、RabbitMQ解決消息丟失問(wèn)題

生產(chǎn)端:?jiǎn)⒂肞ublisher Confirm模式(異步確認(rèn)消息持久化)并設(shè)置mandatory=true路由失敗回退,結(jié)合備份交換機(jī)處理無(wú)法路由的消息;事務(wù)消息因性能損耗僅限關(guān)鍵場(chǎng)景使用。

Broker端:消息與隊(duì)列均需持久化(durable=true)防止宕機(jī)丟失,部署鏡像隊(duì)列集群實(shí)現(xiàn)多節(jié)點(diǎn)冗余;同步刷盤(pán)策略確保數(shù)據(jù)落盤(pán)后響應(yīng)。

消費(fèi)端:關(guān)閉自動(dòng)ACK,采用手動(dòng)ACK并在業(yè)務(wù)處理成功后提交確認(rèn);消費(fèi)失敗時(shí)重試(重試次數(shù)可配置)并最終轉(zhuǎn)入死信隊(duì)列人工干預(yù),避免消息因異常未處理而丟失。

消息積壓?jiǎn)栴}

1、RocketMQ解決消息積壓?jiǎn)栴}

RocketMQ通過(guò)橫向擴(kuò)展(增加消費(fèi)者實(shí)例、隊(duì)列數(shù)量)、提升消費(fèi)能力(線程池調(diào)優(yōu)、批量消費(fèi))、動(dòng)態(tài)擴(kuò)容、消息預(yù)取、死信隊(duì)列隔離無(wú)效消息,并支持消費(fèi)限流及監(jiān)控告警,快速定位處理積壓?jiǎn)栴}。

RocketMQ還提供了消息拉取和推拉模式,消費(fèi)者可以根據(jù)自身的處理能力主動(dòng)拉取消息,避免消息積壓過(guò)多。

2、Kafka解決消息積壓?jiǎn)栴}

Kafka通過(guò)  橫向擴(kuò)展(增加分區(qū)及消費(fèi)者實(shí)例)、優(yōu)化消費(fèi)者參數(shù)(如批量拉取、并發(fā)處理)、提升消費(fèi)邏輯效率(異步化、減少I(mǎi)/O),并動(dòng)態(tài)監(jiān)控消費(fèi)滯后指標(biāo)。

必要時(shí)限流生產(chǎn)者或臨時(shí)擴(kuò)容消費(fèi)組,結(jié)合分區(qū)再平衡策略快速分發(fā)積壓消息負(fù)載。

Kafka還提供了消息清理(compaction)和數(shù)據(jù)保留策略,可以根據(jù)時(shí)間或者數(shù)據(jù)大小來(lái)自動(dòng)刪除過(guò)期的消息,避免消息積壓過(guò)多。

3、RabbitMQ解決消息積壓?jiǎn)栴}

RabbitMQ通過(guò)調(diào)整消費(fèi)者的消費(fèi)速率來(lái)控制消息積壓。

可以使用QoS(Quality of Service)機(jī)制設(shè)置每個(gè)消費(fèi)者的預(yù)取計(jì)數(shù),限制每次從隊(duì)列中獲取的消息數(shù)量,以控制消費(fèi)者的處理速度。

RabbitMQ還支持消費(fèi)者端的流量控制,通過(guò)設(shè)置basic.qos或basic.consume命令的參數(shù)來(lái)控制消費(fèi)者的處理速度,避免消息過(guò)多導(dǎo)致積壓。

消息重復(fù)消費(fèi)問(wèn)題

1、RocketMQ解決消息重復(fù)消費(fèi)問(wèn)題

  • 使用消息唯一標(biāo)識(shí)符(Message ID):在消息發(fā)送時(shí),為每條消息附加一個(gè)唯一標(biāo)識(shí)符。消費(fèi)者在處理消息時(shí),可以通過(guò)判斷消息唯一標(biāo)識(shí)符來(lái)避免重復(fù)消費(fèi)。可以將消息ID記錄在數(shù)據(jù)庫(kù)或緩存中,用于去重檢查。
  • 消費(fèi)者端去重處理:消費(fèi)者在消費(fèi)消息時(shí),可以通過(guò)維護(hù)一個(gè)已消費(fèi)消息的列表或緩存,來(lái)避免重復(fù)消費(fèi)已經(jīng)處理過(guò)的消息。

2、Kafka解決消息重復(fù)消費(fèi)問(wèn)題

  • 冪等性處理:在消費(fèi)者端實(shí)現(xiàn)冪等性邏輯,即多次消費(fèi)同一條消息所產(chǎn)生的結(jié)果與單次消費(fèi)的結(jié)果一致。這可以通過(guò)在業(yè)務(wù)邏輯中引入唯一標(biāo)識(shí)符或記錄已處理消息的狀態(tài)來(lái)實(shí)現(xiàn)。
  • 消息確認(rèn)機(jī)制:消費(fèi)者在處理完消息后,提交已消費(fèi)的偏移量(Offset)給Kafka,Kafka會(huì)記錄已提交的偏移量,以便在消費(fèi)者重新啟動(dòng)時(shí)從正確的位置繼續(xù)消費(fèi)。消費(fèi)者可以定期提交偏移量,確保消息只被消費(fèi)一次。

3、RabbitMQ解決消息重復(fù)消費(fèi)問(wèn)題

  • 冪等性處理:在消費(fèi)者端實(shí)現(xiàn)冪等性邏輯,即無(wú)論消息被消費(fèi)多少次,最終的結(jié)果應(yīng)該保持一致。這可以通過(guò)在消費(fèi)端進(jìn)行唯一標(biāo)識(shí)的檢查或者記錄已經(jīng)處理過(guò)的消息來(lái)實(shí)現(xiàn)。
  • 消息確認(rèn)機(jī)制:消費(fèi)者在處理完消息后,發(fā)送確認(rèn)消息(ACK)給RabbitMQ,告知消息已經(jīng)成功處理。RabbitMQ根據(jù)接收到的確認(rèn)消息來(lái)判斷是否需要重新投遞消息給其他消費(fèi)者。

最為詳細(xì)的方案,請(qǐng)參考尼恩團(tuán)隊(duì)的架構(gòu)方案: 最系統(tǒng)的冪等性方案:一鎖二判三更新

消息有序性

1、Rabbitmq 解決有序性問(wèn)題

模式一:?jiǎn)侮?duì)列單消費(fèi)者模式
  • 將需要保證順序的消息全部發(fā)送到同一個(gè)隊(duì)列,且消費(fèi)者設(shè)置為單線程處理。
  • 原理:RabbitMQ 隊(duì)列天然支持 FIFO 順序存儲(chǔ),單消費(fèi)者避免并發(fā)處理導(dǎo)致亂序。

示例代:

// 生產(chǎn)者發(fā)送到同一隊(duì)列
  rabbitTemplate.convertAndSend("order.queue", "message1");
  rabbitTemplate.convertAndSend("order.queue", "message2");
  // 消費(fèi)者單線程監(jiān)聽(tīng)
  @RabbitListener(queues = "order.queue")
  public void processOrder(String message) {
      // 順序處理邏輯
  }

缺點(diǎn):無(wú)法橫向擴(kuò)展消費(fèi)者,吞吐量受限。

模式二:消息分組策略

按業(yè)務(wù)標(biāo)識(shí)分區(qū)(如訂單 ID、用戶 ID),相同分組的消息路由到同一隊(duì)列,每個(gè)隊(duì)列對(duì)應(yīng)一個(gè)消費(fèi)者。

實(shí)現(xiàn)方式: 生產(chǎn)者通過(guò)哈希算法或自定義路由鍵將關(guān)聯(lián)的消息分配到特定隊(duì)列。

  • 生產(chǎn)者根據(jù)業(yè)務(wù)標(biāo)識(shí)生成路由鍵,如 routingKey = orderId.hashCode() % queueCount
  • 聲明多個(gè)隊(duì)列,綁定到同一交換機(jī),并根據(jù)路由鍵規(guī)則分發(fā)消息。

代碼示例:

// 生產(chǎn)者發(fā)送消息時(shí)指定路由鍵
String orderId = "ORDER_1001";
String routingKey = "order." + (orderId.hashCode() % 3);  // 分配到3個(gè)隊(duì)列之一
rabbitTemplate.convertAndSend("order.exchange", routingKey, message);

優(yōu)勢(shì):在保證同分組順序性的同時(shí),允許不同分組并行處理。

消費(fèi)者并發(fā)控制 設(shè)置

prefetchCount=1

確保每次只處理一個(gè)消息,關(guān)閉自動(dòng)應(yīng)答,手動(dòng)確認(rèn)后再獲取新消息:

spring:
  rabbitmq:
         listener:
           simple:
             prefetch: 1

效果:防止消費(fèi)者同時(shí)處理多個(gè)消息導(dǎo)致亂序。

2、RocketMQ解決有序性問(wèn)題

RocketMQ實(shí)現(xiàn)順序消息的核心是通過(guò)生產(chǎn)端和消費(fèi)端雙重保障:

  • 全局順序需單隊(duì)列(性能受限),分區(qū)順序通過(guò)Sharding Key哈希分散到不同隊(duì)列,兼顧吞吐量與局部有序性。需避免異步消費(fèi)、消息重試亂序,失敗時(shí)跳過(guò)當(dāng)前消息防止阻塞
  • 生產(chǎn)者使用MessageQueueSelector將同一業(yè)務(wù)標(biāo)識(shí)(如訂單ID)的消息強(qiáng)制路由至同一隊(duì)列,利用隊(duì)列FIFO特性保序;
  • 消費(fèi)端對(duì)  同一隊(duì)列啟用 單線程拉取 + 分區(qū)鎖機(jī)制(ConsumeOrderlyContext),確保串行處理。

3、Kafka解決有序性問(wèn)題

Kafka實(shí)現(xiàn)順序消息的核心在于分區(qū)順序性:

  • 生產(chǎn)端:相同業(yè)務(wù)標(biāo)識(shí)(如訂單ID)的消息通過(guò)固定Key哈希至同一分區(qū)(Partitioner),利用分區(qū)內(nèi)消息天然有序性保序;
  • 消費(fèi)端:每個(gè)分區(qū)僅由同一消費(fèi)者組的一個(gè)線程消費(fèi)(單線程串行處理),避免并發(fā)消費(fèi)亂序;

事務(wù)消息

1、RabbitMQ的事務(wù)消息

  • RabbitMQ支持事務(wù)消息的發(fā)送和確認(rèn)。在發(fā)送消息之前,可以通過(guò)調(diào)用"channel.txSelect()"來(lái)開(kāi)啟事務(wù),然后將要發(fā)送的消息發(fā)布到交換機(jī)中。如果事務(wù)成功提交,消息將被發(fā)送到隊(duì)列,否則事務(wù)會(huì)回滾,消息不會(huì)被發(fā)送。
  • 在消費(fèi)端,可以通過(guò)"channel.txSelect()"開(kāi)啟事務(wù),然后使用"basicAck"手動(dòng)確認(rèn)消息的處理結(jié)果。如果事務(wù)成功提交,消費(fèi)端會(huì)發(fā)送ACK確認(rèn)消息的處理;否則,事務(wù)回滾,消息將被重新投遞。
public class RabbitMQTransactionDemo {
    private static final String QUEUE_NAME = "transaction_queue";
    public static void main(String[] args) {
        try {
            // 創(chuàng)建連接工廠
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            // 創(chuàng)建連接
            Connection connection = factory.newConnection();
            // 創(chuàng)建信道
            Channel channel = connection.createChannel();
            // 聲明隊(duì)列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            try {
                // 開(kāi)啟事務(wù)
                channel.txSelect();
                // 發(fā)送消息
                String message = "Hello, RabbitMQ!";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                // 提交事務(wù)
                channel.txCommit();
            } catch (Exception e) {
                // 事務(wù)回滾
                channel.txRollback();
                e.printStackTrace();
            }
            // 關(guān)閉信道和連接
            channel.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、RocketMQ的事務(wù)消息

RocketMQ提供了事務(wù)消息的機(jī)制,確保消息的可靠性和一致性。

發(fā)送事務(wù)消息時(shí),需要將消息發(fā)送到半消息隊(duì)列,然后執(zhí)行本地事務(wù)邏輯。

事務(wù)執(zhí)行成功后,通過(guò)調(diào)用"TransactionStatus.CommitTransaction"提交事務(wù)消息;若事務(wù)執(zhí)行失敗,則通過(guò)調(diào)用"TransactionStatus.RollbackTransaction"回滾事務(wù)消息。

事務(wù)消息的最終狀態(tài)由消息生產(chǎn)者根據(jù)事務(wù)執(zhí)行結(jié)果進(jìn)行確認(rèn)。

public class RocketMQTransactionDemo {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建事務(wù)消息生產(chǎn)者
        TransactionMQProducer producer = new TransactionMQProducer("group_name");
        producer.setNamesrvAddr("localhost:9876");
        // 設(shè)置事務(wù)監(jiān)聽(tīng)器
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 執(zhí)行本地事務(wù)邏輯,根據(jù)業(yè)務(wù)邏輯結(jié)果返回相應(yīng)的狀態(tài)
                // 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務(wù)提交
                // 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務(wù)回滾
                // 返回 LocalTransactionState.UNKNOW 表示事務(wù)狀態(tài)未知
            }
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 根據(jù)消息的狀態(tài),來(lái)判斷本地事務(wù)的最終狀態(tài)
                // 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務(wù)提交
                // 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務(wù)回滾
                // 返回 LocalTransactionState.UNKNOW 表示事務(wù)狀態(tài)未知
            }
        });
        // 啟動(dòng)事務(wù)消息生產(chǎn)者
        producer.start();
        // 構(gòu)造消息
        Message msg = new Message("topic_name", "tag_name", "Hello, RocketMQ!".getBytes());
        // 發(fā)送事務(wù)消息
        TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.println("Send Result: " + sendResult);
        // 關(guān)閉事務(wù)消息生產(chǎn)者
        producer.shutdown();
    }
}

3、Kafka的事務(wù)消息

Kafka引入了事務(wù)功能來(lái)確保消息的原子性和一致性。事務(wù)消息的發(fā)送和確認(rèn)在生產(chǎn)者端進(jìn)行。

生產(chǎn)者可以通過(guò)初始化事務(wù),將一系列的消息寫(xiě)入事務(wù),然后通過(guò)"commitTransaction()"提交事務(wù),或者通過(guò)"abortTransaction()"中止事務(wù)。

Kafka會(huì)保證在事務(wù)提交之前,寫(xiě)入的所有消息不會(huì)被消費(fèi)者可見(jiàn),以保持事務(wù)的一致性。

public class KafkaTransactionDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id");
        Producer<String, String> producer = new KafkaProducer<>(props);
        // 初始化事務(wù)
        producer.initTransactions();
        try {
            // 開(kāi)啟事務(wù)
            producer.beginTransaction();
            // 發(fā)送消息
            ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "Hello, Kafka!");
            producer.send(record);
            // 提交事務(wù)
            producer.commitTransaction();
        } catch (ProducerFencedException e) {
            // 處理異常情況
            producer.close();
        } finally {
            producer.close();
        }
    }
}

消息確認(rèn) ACK機(jī)制

1、RabbitMQ的ACK機(jī)制

RabbitMQ使用ACK(消息確認(rèn))機(jī)制來(lái)確保消息的可靠傳遞。

消費(fèi)者收到消息后,需要向RabbitMQ發(fā)送ACK來(lái)確認(rèn)消息的處理狀態(tài)。

只有在收到ACK后,RabbitMQ才會(huì)將消息標(biāo)記為已成功傳遞,否則會(huì)將消息重新投遞給其他消費(fèi)者或者保留在隊(duì)列中。

以下是RabbitMQ ACK的Java示例:

public class RabbitMQAckDemo {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 創(chuàng)建連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建信道
        Channel channel = connection.createChannel();
        // 聲明隊(duì)列
        String queueName = "queue_name";
        channel.queueDeclare(queueName, false, false, false, null);
        // 創(chuàng)建消費(fèi)者
        String consumerTag = "consumer_tag";
        boolean autoAck = false; // 關(guān)閉自動(dòng)ACK
        // 消費(fèi)消息
        channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消費(fèi)消息
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message);
                try {
                    // 模擬處理消息的業(yè)務(wù)邏輯
                    processMessage(message);
                    // 手動(dòng)發(fā)送ACK確認(rèn)消息
                    long deliveryTag = envelope.getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    // 處理消息異常,可以選擇重試或者記錄日志等操作
                    System.out.println("Failed to process message: " + message);
                    e.printStackTrace();
                    // 手動(dòng)發(fā)送NACK拒絕消息,并可選是否重新投遞
                    long deliveryTag = envelope.getDeliveryTag();
                    boolean requeue = true; // 重新投遞消息
                    channel.basicNack(deliveryTag, false, requeue);
                }
            }
        });
    }
    private static void processMessage(String message) {
        // 模擬處理消息的業(yè)務(wù)邏輯
    }
}

2、RocketMQ的ACK機(jī)制

RocketMQ的ACK機(jī)制由消費(fèi)者控制,消費(fèi)者從消息隊(duì)列中消費(fèi)消息后,可以手動(dòng)發(fā)送ACK確認(rèn)消息的處理狀態(tài)。

只有在收到ACK后,RocketMQ才會(huì)將消息標(biāo)記為已成功消費(fèi),否則會(huì)將消息重新投遞給其他消費(fèi)者。

public class RocketMQAckDemo {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建消費(fèi)者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
        consumer.setNamesrvAddr("localhost:9876");
        // 訂閱消息
        consumer.subscribe("topic_name", "*");
        // 注冊(cè)消息監(jiān)聽(tīng)器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt message : msgs) {
                try {
                    // 消費(fèi)消息
                    String msgBody = new String(message.getBody(), "UTF-8");
                    System.out.println("Received message: " + msgBody);
                    // 模擬處理消息的業(yè)務(wù)邏輯
                    processMessage(msgBody);
                    // 手動(dòng)發(fā)送ACK確認(rèn)消息
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 處理消息異常,可以選擇重試或者記錄日志等操作
                    System.out.println("Failed to process message: " + new String(message.getBody()));
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 啟動(dòng)消費(fèi)者
        consumer.start();
    }
    private static void processMessage(String message) {
        // 模擬處理消息的業(yè)務(wù)邏輯
    }
}

3、Kafka的ACK機(jī)制

Kafka的ACK機(jī)制用于控制生產(chǎn)者在發(fā)送消息后,需要等待多少個(gè)副本確認(rèn)才視為消息發(fā)送成功。

這個(gè)機(jī)制可以通過(guò)設(shè)置acks參數(shù)來(lái)進(jìn)行配置。在Kafka中,acks參數(shù)有三個(gè)可選值:

acks=0:生產(chǎn)者在發(fā)送消息后不需要等待任何確認(rèn),直接將消息發(fā)送給Kafka集群。這種方式具有最高的吞吐量,但是也存在數(shù)據(jù)丟失的風(fēng)險(xiǎn),因?yàn)樯a(chǎn)者不會(huì)知道消息是否成功發(fā)送給任何副本。

acks=1:生產(chǎn)者在發(fā)送消息后只需要等待首領(lǐng)副本(leader replica)確認(rèn)。一旦首領(lǐng)副本成功接收到消息,生產(chǎn)者就會(huì)收到確認(rèn)。這種方式提供了一定的可靠性,但是如果首領(lǐng)副本在接收消息后但在確認(rèn)之前發(fā)生故障,仍然可能會(huì)導(dǎo)致數(shù)據(jù)丟失。

acks=all:生產(chǎn)者在發(fā)送消息后需要等待所有副本都確認(rèn)。只有當(dāng)所有副本都成功接收到消息后,生產(chǎn)者才會(huì)收到確認(rèn)。這是最安全的確認(rèn)機(jī)制,確保了消息不會(huì)丟失,但是需要更多的時(shí)間和資源。acks=-1與acks=all是等效的。

public classKafkaProducerDemo{
    public static void main(String[]args){
        // 配置Kafka生產(chǎn)者的參數(shù)
        Propertiesprops=newProperties();
        props.put("bootstrap.servers","localhost:9092");// Kafka集群的地址和端口
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// 鍵的序列化器
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 值的序列化器
        props.put("acks","all");// 設(shè)置ACK機(jī)制為所有副本都確認(rèn)
        // 創(chuàng)建生產(chǎn)者實(shí)例
        KafkaProducer<String,String>producer=newKafkaProducer<>(props);
        // 構(gòu)造消息
        Stringtopic="my_topic";
        Stringkey="my_key";
        Stringvalue="Hello, Kafka!";
        // 創(chuàng)建消息記錄
        ProducerRecord<String,String>record=newProducerRecord<>(topic,key,value);
        // 發(fā)送消息
        producer.send(record,newCallback(){
            @Override
            publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){
                if(exception!=null){
                    System.err.println("發(fā)送消息出現(xiàn)異常:"+exception.getMessage());
                }else{
                    System.out.println("消息發(fā)送成功!位于分區(qū) "+metadata.partition()+",偏移量 "+metadata.offset());
                }
            }
        });
        // 關(guān)閉生產(chǎn)者
        producer.close();
    }
}

延遲消息實(shí)現(xiàn)

延遲隊(duì)列在實(shí)際項(xiàng)目中有非常多的應(yīng)用場(chǎng)景,最常見(jiàn)的比如訂單未支付,超時(shí)取消訂單,在創(chuàng)建訂單的時(shí)候發(fā)送一條延遲消息,達(dá)到延遲時(shí)間之后消費(fèi)者收到消息,如果訂單沒(méi)有支付的話,那么就取消訂單。


image-20250508192415995image-20250508192415995

1、RocketMQ實(shí)現(xiàn)延遲消息

RocketMQ 默認(rèn)時(shí)間間隔分為 18 個(gè)級(jí)別,基本上也能滿足大部分場(chǎng)景的需要了。

默認(rèn)延遲級(jí)別:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

使用起來(lái)也非常的簡(jiǎn)單,直接通過(guò)setDelayTimeLevel設(shè)置延遲級(jí)別即可。

setDelayTimeLevel(level)

實(shí)現(xiàn)原理說(shuō)起來(lái)比較簡(jiǎn)單,Broker 會(huì)根據(jù)不同的延遲級(jí)別創(chuàng)建出多個(gè)不同級(jí)別的隊(duì)列,當(dāng)我們發(fā)送延遲消息的時(shí)候,根據(jù)不同的延遲級(jí)別發(fā)送到不同的隊(duì)列中,同時(shí)在 Broker 內(nèi)部通過(guò)一個(gè)定時(shí)器去輪詢這些隊(duì)列(RocketMQ 會(huì)為每個(gè)延遲級(jí)別分別創(chuàng)建一個(gè)定時(shí)任務(wù)),如果消息達(dá)到發(fā)送時(shí)間,那么就直接把消息發(fā)送到指 topic 隊(duì)列中。

RocketMQ 這種實(shí)現(xiàn)方式是放在服務(wù)端去做的,同時(shí)有個(gè)好處就是相同延遲時(shí)間的消息是可以保證有序性的。

談到這里就順便提一下關(guān)于消息消費(fèi)重試的原理,這個(gè)本質(zhì)上來(lái)說(shuō)其實(shí)是一樣的,對(duì)于消費(fèi)失敗需要重試的消息實(shí)際上都會(huì)被丟到延遲隊(duì)列的 topic 里,到期后再轉(zhuǎn)發(fā)到真正的 topic 中。


image-20250508192539070image-20250508192539070

2、RabbitMQ實(shí)現(xiàn)延遲消息

RabbitMQ本身并不存在延遲隊(duì)列的概念,在 RabbitMQ 中是通過(guò) DLX 死信交換機(jī)和 TTL 消息過(guò)期來(lái)實(shí)現(xiàn)延遲隊(duì)列的。

TTL(Time to Live)過(guò)期時(shí)間

有兩種方式可以設(shè)置 TTL。

(1) 通過(guò)隊(duì)列屬性設(shè)置,這樣的話隊(duì)列中的所有消息都會(huì)擁有相同的過(guò)期時(shí)間(2) 對(duì)消息單獨(dú)設(shè)置過(guò)期時(shí)間,這樣每條消息的過(guò)期時(shí)間都可以不同

那么如果同時(shí)設(shè)置呢?這樣將會(huì)以兩個(gè)時(shí)間中較小的值為準(zhǔn)。

針對(duì)隊(duì)列的方式通過(guò)參數(shù)x-message-ttl來(lái)設(shè)置。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

針對(duì)消息的方式通過(guò)setExpiration來(lái)設(shè)置。

AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());

DLX(Dead Letter Exchange)死信交換機(jī)

一個(gè)消息要成為死信消息有 3 種情況:

(1) 消息被拒絕,比如調(diào)用reject方法,并且需要設(shè)置requeuefalse

(2) 消息過(guò)期

(3) 隊(duì)列達(dá)到最大長(zhǎng)度

可以通過(guò)參數(shù)dead-letter-exchange設(shè)置死信交換機(jī),也可以通過(guò)參數(shù)dead-letter- exchange指定 RoutingKey(未指定則使用原隊(duì)列的 RoutingKey)。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

實(shí)現(xiàn)原理

當(dāng)我們對(duì)消息設(shè)置了 TTL 和 DLX 之后,當(dāng)消息正常發(fā)送,通過(guò) Exchange 到達(dá) Queue 之后,由于設(shè)置了 TTL 過(guò)期時(shí)間,并且消息沒(méi)有被消費(fèi)(訂閱的是死信隊(duì)列),達(dá)到過(guò)期時(shí)間之后,消息就轉(zhuǎn)移到與之綁定的 DLX 死信隊(duì)列之中。

這樣的話,就相當(dāng)于通過(guò) DLX 和 TTL 間接實(shí)現(xiàn)了延遲消息的功能,實(shí)際使用中我們可以根據(jù)不同的延遲級(jí)別綁定設(shè)置不同延遲時(shí)間的隊(duì)列來(lái)達(dá)到實(shí)現(xiàn)不同延遲時(shí)間的效果。

如果隊(duì)列通過(guò) dead-letter-exchange 屬性指定了一個(gè)交換機(jī),那么該隊(duì)列中的死信就會(huì)投遞到這個(gè)交換機(jī)中,這個(gè)交換機(jī)稱為死信交換機(jī)(Dead Letter Exchange,簡(jiǎn)稱DLX)。


image-20250508192505250image-20250508192505250

3、Kafka實(shí)現(xiàn)延遲消息

對(duì)于 Kafka 來(lái)說(shuō),原生并不支持延遲隊(duì)列的功能,需要我們手動(dòng)去實(shí)現(xiàn),這里我根據(jù) RocketMQ 的設(shè)計(jì)提供一個(gè)實(shí)現(xiàn)思路。

這個(gè)設(shè)計(jì),我們也不支持任意時(shí)間精度的延遲消息,只支持固定級(jí)別的延遲,因?yàn)閷?duì)于大部分延遲消息的場(chǎng)景來(lái)說(shuō)足夠使用了。

只創(chuàng)建一個(gè) topic,但是針對(duì)該 topic 創(chuàng)建 18 個(gè) partition,每個(gè) partition 對(duì)應(yīng)不同的延遲級(jí)別,這樣做和 RocketMQ 一樣有個(gè)好處就是能達(dá)到相同延遲時(shí)間的消息達(dá)到有序性。

應(yīng)用級(jí) Kafka 延遲消息實(shí)現(xiàn)原理

首先創(chuàng)建一個(gè)單獨(dú)針對(duì)延遲隊(duì)列的 topic,同時(shí)創(chuàng)建 18 個(gè) partition 針對(duì)不同的延遲級(jí)別。

發(fā)送消息的時(shí)候根據(jù)延遲參數(shù)發(fā)送到延遲 topic 對(duì)應(yīng)的 partition,對(duì)應(yīng)的key為延遲時(shí)間,同時(shí)把原 topic 保存到 header 中。

ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("delay_topic", delayPartition, delayTime, data);
producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));

內(nèi)嵌的consumer單獨(dú)設(shè)置一個(gè)ConsumerGroup去消費(fèi)延遲 topic 消息,消費(fèi)到消息之后如果沒(méi)有達(dá)到延遲時(shí)間那么就進(jìn)行pause,然后seek到當(dāng)前ConsumerRecordoffset位置,同時(shí)使用定時(shí)器去輪詢延遲的TopicPartition,達(dá)到延遲時(shí)間之后進(jìn)行resume

如果達(dá)到了延遲時(shí)間,那么就獲取到header中的真實(shí) topic ,直接轉(zhuǎn)發(fā)。

這里為什么要進(jìn)行pauseresume呢?

因?yàn)槿绻贿@樣的話,如果超時(shí)未消費(fèi)達(dá)到max.poll.interval.ms最大時(shí)間(默認(rèn)300s),那么將會(huì)觸發(fā) Rebalance。

責(zé)任編輯:武曉燕 來(lái)源: 技術(shù)自由圈
相關(guān)推薦

2009-02-16 17:21:46

2023-09-19 08:09:21

RabbitMQRocketMQKafka

2025-02-27 08:50:00

RocketMQ開(kāi)發(fā)代碼

2025-05-26 02:15:00

2017-05-05 10:15:38

深度學(xué)習(xí)框架對(duì)比分析

2010-05-13 13:27:23

2012-11-19 11:30:40

PowerShell常見(jiàn)問(wèn)題解決方法

2010-06-08 11:15:43

OpenSUSE Ub

2011-11-23 16:28:07

JavaSpring框架

2018-04-23 09:50:54

2018-05-10 12:55:51

大數(shù)據(jù)對(duì)比分析面試

2010-06-12 15:36:01

2024-08-22 14:49:49

系統(tǒng)設(shè)計(jì)數(shù)據(jù)庫(kù)

2015-09-22 10:14:57

虛擬化虛擬化問(wèn)題

2021-08-24 07:57:26

KafkaRocketMQPulsar

2010-08-06 16:15:57

Flex通信

2024-01-09 15:37:46

2010-07-20 16:16:21

SDH

2011-04-08 13:58:52

JavaJSP

2011-05-30 15:12:46

電纜雙絞線布線
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 三级在线视频 | 男女爱爱福利视频 | 欧美韩一区二区 | 久久国产精品一区二区三区 | 免费三级av | 午夜精品久久久久久久星辰影院 | 久久综合一区二区三区 | 欧美日韩在线成人 | 一区二区视频在线 | 精品久久99| 伊人网综合在线观看 | 365夜爽爽欧美性午夜免费视频 | 91视频18 | 99精品一级欧美片免费播放 | 国产精品自拍啪啪 | 国产在线一区二区三区 | 黄久久久 | 超碰人人人人 | 九九免费在线视频 | 国产精品一区二区在线 | 日韩欧美网 | 成年免费大片黄在线观看岛国 | 日韩一区二区三区精品 | 久久男女视频 | 国产精品久久午夜夜伦鲁鲁 | 国产精品99久久久久久人 | 国产精品3区 | 欧美激情国产精品 | 欧美久久一区二区 | 你懂的国产| 黄色国产大片 | 精品亚洲一区二区三区四区五区高 | 一本一道久久a久久精品综合蜜臀 | 成人在线免费电影 | 九九热这里只有精品在线观看 | 91免费高清视频 | 中文字幕免费中文 | 欧美一区二区三区精品免费 | 91免费看片神器 | 狠狠伊人 | 在线看黄免费 |