成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

消息隊(duì)列 RocketMQ 入門指南

開發(fā)
本文我們將從RocketMQ的基本概念出發(fā),逐步講解其核心功能,并通過簡單的實(shí)踐示例,幫助你快速上手。

在當(dāng)今的分布式系統(tǒng)中,消息隊(duì)列(Message Queue)作為解耦、異步通信和流量削峰的重要組件,扮演著不可或缺的角色。而RocketMQ,作為阿里巴巴開源的一款高性能、高可靠的分布式消息中間件,憑借其強(qiáng)大的功能和穩(wěn)定的性能,成為了眾多開發(fā)者和企業(yè)的首選。

無論你是剛剛接觸消息隊(duì)列的新手,還是希望深入了解RocketMQ的開發(fā)者,這篇文章都將為你提供一個(gè)清晰的入門指南。我們將從RocketMQ的基本概念出發(fā),逐步講解其核心功能,并通過簡單的實(shí)踐示例,幫助你快速上手。

一、詳解RocketMQ基礎(chǔ)概念

1. 為什么要用RocketMQ

相比于市場上的各種消息隊(duì)列,它有如下優(yōu)勢:

  • 性能好。
  • 穩(wěn)定可靠。
  • 中文社區(qū)活躍。

當(dāng)然缺點(diǎn)也是有那么一些些的,兼容性確實(shí)不太行。

2. RocketMQ優(yōu)缺點(diǎn)是什么

優(yōu)點(diǎn):

  • 單機(jī)吞吐量為10w級(jí)。
  • 可用性很高,支持分布式架構(gòu)。
  • 擴(kuò)展性好。
  • 支持10億級(jí)別的消息堆積,而且不會(huì)因?yàn)槎逊e導(dǎo)致性能下降。
  • 源碼是用Java寫的,對于Java程序員來說非常方便改造。
  • 參數(shù)優(yōu)化配置,消息基本可以做到0丟失。
  • 使用于對可靠性要求高的金融行業(yè)。

缺點(diǎn):

  • 目前只支持Java、C++客戶端,而且C++還不算完善。
  • 沒有在MQ核心實(shí)現(xiàn)JMS相關(guān)接口,有些遷移改造就比較麻煩了。

3. 消息隊(duì)列使用場景

解耦: 例如用戶完成下單除了必要的庫存扣減和訂單狀態(tài)更新外,我們還需要處理一些積分系統(tǒng)、推送系統(tǒng)的無關(guān)緊要的業(yè)務(wù)處理,如果全部順序執(zhí)行,等待時(shí)間就會(huì)變得很漫長,所以我們需要借助MQ將邊角業(yè)務(wù)從業(yè)務(wù)模塊中解耦開來。

  • 異步: 這點(diǎn)不必多說,上述的解耦方案就會(huì)使得積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)任務(wù)異步執(zhí)行。
  • 削峰: 可以理解為一個(gè)漏斗,例如我們的某個(gè)服務(wù)只能抗住10wQPS,可是當(dāng)前請求卻達(dá)到20w的QPS,那么我們就可以將請求全部先扔到MQ中,讓服務(wù)慢慢消化處理。

二、RocketMQ基礎(chǔ)安裝與實(shí)踐

1. 安裝并啟動(dòng)RocketMQ

在編寫業(yè)務(wù)代碼之前,我們必須完成一下RocketMQ的部署,首先我們自然要下載一下RocketMQ,下載地址如下,筆者下載的是rocketmq-all-4.8.0-bin-release這個(gè)版本:https://rocketmq.apache.org/download/

完成完成后,我們將其解壓到自定義的路徑,鍵入sudo vim /etc/profile配置MQ環(huán)境變量,完成后鍵入source /etc/profile使之生效,對應(yīng)的配置內(nèi)容如下所示:

export ROCKETMQ_HOME=/home/sharkchili/rocketmq-all-4.8.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin

需要注意的是筆者本次采用WSL的Ubuntu子系統(tǒng)時(shí)啟動(dòng)時(shí)腳本會(huì)拋出runserver.sh: 70: [[: Exec format error錯(cuò)誤,嘗試格式化和指令配置后都沒有很好的解決,于是循著報(bào)錯(cuò)找到runserver.sh這行對應(yīng)的腳本內(nèi)容,該括弧本質(zhì)上就是基于JDK內(nèi)容配置對應(yīng)的GC算法:

以筆者為里系統(tǒng)是jdk8,所以直接去掉判斷用走JDK8的配置即可:

choose_gc_options()
{

      JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFractinotallow=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
      JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
      JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
 
}

完成后鍵入./mqnamesrv &將MQ啟動(dòng),如果彈窗輸出下面這條結(jié)果,則說明mq的NameServer啟動(dòng)成功。

Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

然后我們再鍵入./mqbroker -n 127.0.0.1:9876啟動(dòng)broker,需要注意的是默認(rèn)情況下broker占用堆內(nèi)存差不多是4g,所以讀者本地部署時(shí)建議修改一下runbroker.sh的堆內(nèi)存,如下圖所示:

若彈窗輸出下面所示的文字,則說明broker啟動(dòng)成功,自此mq就在windows環(huán)境部署成功了。我們就可以開始編碼工作了。

The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

2. 訂單系統(tǒng)改造

本次的示例是關(guān)于訂單系統(tǒng)改造,用戶下單完成后,服務(wù)器需要進(jìn)行庫存扣減、訂單狀態(tài)更新、以及優(yōu)惠券、積分等邊邊角角的業(yè)務(wù),如果順序執(zhí)行這些邏輯+網(wǎng)絡(luò)開銷,接口耗時(shí)對于用戶體驗(yàn)是非常不友好的。

所以我們在將非核心業(yè)務(wù)邏輯從接口串行調(diào)用中抽出,下單業(yè)務(wù)只需關(guān)注完成我們庫存扣減、訂單狀態(tài)更新就行了,剩下的業(yè)務(wù)用MQ發(fā)個(gè)消息給積分系統(tǒng)、促銷系統(tǒng)告知他們自己處理一下就行了:

首先我們引入MQ依賴腳手架:

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

同時(shí)這里我們也給出配置信息:

# mq地址端口
rocketmq.name-server=127.0.0.1:9876
# 生產(chǎn)者配置
rocketmq.producer.isOnOff=on
# 發(fā)送同一類消息設(shè)置為同一個(gè)group,保證唯一
rocketmq.producer.group=rocketmq-group
rocketmq.producer.groupName=rocketmq-group
# namesrv地址
rocketmq.producer.namesrvAddr=127.0.0.1:9876
# 設(shè)置消息最大長度 4M
rocketmq.producer.maxMessageSize=4096
# 消息發(fā)送超時(shí)時(shí)間
rocketmq.producer.sendMsgTimeout=3000
# 消息發(fā)送失敗重試次數(shù)
rocketmq.producer.retryTimesWhenSendFailed=2

隨后我們設(shè)置監(jiān)聽處理關(guān)于訂單創(chuàng)建的topic消息:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}", topic = "ORDER_ADD")
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進(jìn)行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
    }
}

完成后我們基于CommandLineRunner 測試一下消息發(fā)送:

@Component
@Slf4j
public class MQTest implements CommandLineRunner {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;


    @Override
    public void run(String... args) throws Exception {

        Order order = new Order();
        order.setOrderNo("20221217001002003");
        order.setUserId(1);
        order.setPrice(500.00);

        rocketMQTemplate.asyncSend("ORDER_ADD", MessageBuilder.withPayload(order).build(), getDefaultSendCallBack());

    }

    /**
     * 消息處理默認(rèn)回調(diào)
     * @return
     */
    private SendCallback getDefaultSendCallBack() {
        return new SendCallback() {

            @Override
            public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
                log.info("MQ消息發(fā)送成功。result:{}", JSONUtil.toJsonStr(sendResult));
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("MQ消息發(fā)送失敗,失敗原因:{}" + throwable.getMessage());
            }
        };
    }

}

日志如下,可以看到消息消費(fèi)成功了:

2025-02-11 10:03:14.577  INFO 14420 --- [MessageThread_1] com.sharkChili.config.OrderMsgListener   : 收到訂單,訂單信息:[{"userId":1,"orderNo":"20221217001002003","price":500}],進(jìn)行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....
2025-02-11 10:03:14.577  INFO 14420 --- [ublicExecutor_2] com.sharkChili.runner.MQTest             : MQ消息發(fā)送成功。result:{"sendStatus":"SEND_OK","msgId":"AC1E1001385418B4AAC235A7E0190000","messageQueue":{"topic":"ORDER_ADD","brokerName":"DESKTOP-DC9PSUS","queueId":2},"queueOffset":1,"offsetMsgId":"AC15733800002A9F0000000000000558","regionId":"DefaultRegion","traceOn":true}

3. 如何實(shí)現(xiàn)消息過濾

設(shè)置tag消息的方式常見的是有兩種,一種是基于tag標(biāo)簽過濾,如下代碼所示,我們希望發(fā)送訂單業(yè)務(wù)即ORDER_ADD這個(gè)主題下tag標(biāo)簽為tagA的用戶收到消息,那么我們就可以通過ORDER_ADD:tagA針對topic進(jìn)行更進(jìn)一步劃分:

//創(chuàng)建訂單消息
        Order order = new Order();
        order.setUserId(1);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setPrice(500);
        //生成消息
        Message<Order> message = MessageBuilder.withPayload(order)
                .build();
        //同步發(fā)送
        rocketMQTemplate.syncSend("ORDER_ADD:tagA", message);

對應(yīng)的監(jiān)聽者通過selectorExpression 指定標(biāo)簽即可:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
        topic = "ORDER_ADD",
        selectorExpression = "tagA"http://訂閱tagA的消息
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進(jìn)行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
    }
}

還有一種就是基于SQL過濾,因?yàn)楸磉_(dá)式靈活,相對更強(qiáng)大一些,例如我們的消費(fèi)者只處理userId為10以內(nèi)的數(shù)據(jù),那么消費(fèi)者的監(jiān)聽就可以按照如下姿勢進(jìn)行配置:

@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.producer.groupName}",
        topic = "ORDER_ADD",
        selectorType = SelectorType.SQL92,//指令類型為sql表達(dá)式
        selectorExpression = "userId<10"http://過濾出id小于10的用戶的訂單
)
@Slf4j
public class OrderMsgListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        log.info("收到訂單,訂單信息:[{}],進(jìn)行積分系統(tǒng)、促銷系統(tǒng)、推送系統(tǒng)業(yè)務(wù)處理.....", JSONUtil.toJsonStr(order));
    }
}

發(fā)送消息時(shí),通過headers 指定本消息條件并通過convertAndSend發(fā)送即可:

//創(chuàng)建訂單消息
        Order order = new Order();
        order.setUserId(1);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setPrice(500);
        //通過header攜帶條件告知當(dāng)前userId為1
        Map<String, Object> headers = new HashMap<>();
        headers.put("userId", 1);
        //生成消息
        Message<Order> message = MessageBuilder.withPayload(order)
                .build();
        //發(fā)送
        rocketMQTemplate.convertAndSend("ORDER_ADD", message, headers);

需要注意的是默認(rèn)情況下,MQ是不支持SQL表達(dá)過濾,我們需要到conf目錄下的broker.conf文件,添加enablePropertyFilter=true,然后鍵入如下指令降broker啟動(dòng):

./mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true -c ../conf/broker.conf

4. 如何提交延時(shí)消息

延遲消息即需要消費(fèi)者過一段時(shí)間后才能消費(fèi)的消息,例如我們現(xiàn)在有個(gè)消息要求消費(fèi)者10s后才能消費(fèi),那么我們就可以使用延遲消息,如下代碼所示:

// 創(chuàng)建延遲消息
        Message<String> rocketMessage = MessageBuilder.withPayload("this is delay msg").build();
        // 發(fā)送延遲消息,timeout設(shè)置為10000即10s,delayLevel表示延遲等級(jí),1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,所以 3為10s
        rocketMQTemplate.syncSend("delay_topic", rocketMessage, 10000,3);
        log.info("延遲消息發(fā)送完成");

消費(fèi)者代碼:

@Component
@RocketMQMessageListener(consumerGroup = "delay_msg_group", topic = "delay_topic")
@Slf4j
public class DelayMsgListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        log.info("收到延遲消息,消息內(nèi)容:{}", JSONUtil.toJsonStr(msg));
    }
}

輸出結(jié)果,可以看到確實(shí)是10s后消費(fèi)者采納看到消息并消費(fèi):

2025-02-11 10:56:58.300  INFO 18568 --- [           main] com.sharkChili.runner.MQTest             : 延遲消息發(fā)送完成
2025-02-11 10:57:08.307  INFO 18568 --- [MessageThread_1] com.sharkChili.config.DelayMsgListener   : 收到延遲消息,消息內(nèi)容:this is delay msg
責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2022-09-21 21:50:18

Dapr消息隊(duì)列

2017-07-11 15:26:57

LocalMQ RocketMQ高性能

2024-10-08 08:52:59

2017-10-11 15:08:28

消息隊(duì)列常見

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-10-29 08:34:27

RocketMQ消息類型事務(wù)消息

2025-06-04 01:35:00

RocketMQ異步消息

2022-12-22 10:03:18

消息集成

2023-09-18 08:27:20

RabbitMQRocketMQKafka

2023-07-18 09:03:01

RocketMQ場景消息

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-17 08:34:03

RocketMQ消息初體驗(yàn)

2023-08-17 10:20:18

RabbitMQ系統(tǒng)

2023-11-20 09:33:43

開發(fā)指南

2023-12-21 08:01:41

RocketMQ消息堆積

2022-03-31 08:26:44

RocketMQ消息排查

2024-09-13 08:49:45

2021-04-07 08:43:09

SpringBootRocketMQ開發(fā)技術(shù)

2020-11-13 16:40:05

RocketMQ延遲消息架構(gòu)

2024-09-25 08:32:05

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 亚洲激情综合网 | 亚洲精品91 | av一区二区三区四区 | 91麻豆精品国产91久久久久久久久 | 久久久成人一区二区免费影院 | 亚洲 精品 综合 精品 自拍 | 色婷婷av一区二区三区软件 | 亚洲国产精品一区二区三区 | 欧美看片| 免费成人在线网站 | www.成人.com | 国产在线二区 | 国产精品久久久久久久久久妇女 | 91麻豆精品国产91久久久久久 | 久久亚洲高清 | 国产精品高潮呻吟久久av野狼 | 成年免费大片黄在线观看岛国 | 不卡一二区 | 国产精品毛片 | 日韩成人免费视频 | 亚洲国产精品一区二区三区 | 国产激情在线观看视频 | 免费黄色a级毛片 | 一区二区三区影院 | 日韩和的一区二在线 | 国产成人精品综合 | 久久成人一区 | 欧美理论片在线观看 | 99久久免费观看 | 中文字幕在线播放第一页 | 欧美videosex性极品hd | 欧美一区二 | 免费人成在线观看网站 | 91在线电影| 色妞av| 国产在线观看一区二区 | 亚洲a毛片 | 亚洲美女一区 | www.久久 | 大陆一级毛片免费视频观看 | 北条麻妃一区二区三区在线视频 |