成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

SpringBoot 集成 RocketMQ:異步消息隊列實戰,讓系統飛起來!

開發 前端
本文將手把手教你如何在 SpringBoot 中集成 RocketMQ,實現完整的異步消息處理流程!

引言:為什么需要異步消息隊列?

在現代高并發系統中,解耦服務、削峰填谷、異步處理已成為架構設計的核心需求。RocketMQ 作為阿里巴巴開源的分布式消息中間件,憑借其高吞吐、低延遲、高可用的特性,成為企業級應用的首選解決方案。

本文將手把手教你如何在 SpringBoot 中集成 RocketMQ,實現完整的異步消息處理流程!

一、環境準備與項目搭建

1. 技術棧

  • JDK 1.8+
  • SpringBoot 2.7.x
  • RocketMQ 4.9.3
  • RocketMQ-Spring-Boot-Starter 2.2.2

2. 添加依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

二、核心實現:生產者與消費者

1. 配置文件 (application.yml)

rocketmq:
  name-server: 127.0.0.1:9876  # RocketMQ NameServer地址
  producer:
    group: order_producer_group # 生產者組名
    send-message-timeout: 3000  # 發送超時時間(ms)

2. 消息生產者服務

@Service
@RequiredArgsConstructor
public class OrderProducer {
    private final RocketMQTemplate rocketMQTemplate;


    // 發送普通消息
    public void sendOrderMessage(Order order) {
        Message<Order> message = MessageBuilder.withPayload(order)
                .setHeader(RocketMQHeaders.KEYS, order.getOrderId())
                .build();


        rocketMQTemplate.send("order_topic", message);
        log.info("訂單消息已發送: {}", order);
    }


    // 發送延遲消息(30秒后消費)
    public void sendDelayMessage(Order order) {
        rocketMQTemplate.syncSend("delay_order_topic", 
            MessageBuilder.withPayload(order).build(),
            2000,  // 發送超時
            3      // 延遲級別(對應30秒)
        );
        log.info("延遲訂單消息已發送: {}", order);
    }
}

3. 消息消費者服務

@Slf4j
@Service
@RocketMQMessageListener(
    topic = "order_topic",
    consumerGroup = "order_consumer_group",
    selectorType = SelectorType.TAG,
    selectorExpression = "normal || vip"  // 過濾標簽
)
public class OrderConsumer implements RocketMQListener<Order> {


    @Override
    public void onMessage(Order order) {
        log.info("收到訂單消息,開始處理: {}", order);
        // 業務處理邏輯...
        processOrder(order);
    }


    private void processOrder(Order order) {
        // 模擬業務處理
        log.info("訂單處理完成: {}", order.getOrderId());
    }
}

4. 訂單實體類

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order implements Serializable {
    private String orderId;
    private BigDecimal amount;
    private LocalDateTime createTime;
    private String userId;
    private String orderType; // 用于消息過濾
}

三、高級特性實現

1. 事務消息處理(解決分布式事務)

@Slf4j
@Service
@RocketMQTransactionListener
public class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {


    @Autowired
    private OrderService orderService;


    // 執行本地事務
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            Order order = (Order) msg.getPayload();
            orderService.createOrder(order); // 本地數據庫操作
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("本地事務執行失敗", e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }


    // 本地事務狀態回查
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String orderId = msg.getHeaders().get("orderId").toString();
        return orderService.checkOrderExists(orderId) ? 
            RocketMQLocalTransactionState.COMMIT : 
            RocketMQLocalTransactionState.ROLLBACK;
    }
}

2. 消息重試與死信隊列

// 消費者配置重試策略
@RocketMQMessageListener(
    topic = "important_order_topic",
    consumerGroup = "important_order_group",
    maxReconsumeTimes = 3  // 最大重試次數
)
public class ImportantOrderConsumer implements RocketMQListener<Order> {


    @Override
    public void onMessage(Order order) {
        try {
            processImportantOrder(order);
        } catch (Exception e) {
            throw new RuntimeException("處理失敗,觸發重試");
        }
    }


    // 達到最大重試次數后,消息進入死信隊列
    // 死信隊列命名: %DLQ%consumerGroup
}

四、性能優化技巧

1. 批量消息發送 - 提升吞吐量

List<Message<Order>> messages = orders.stream()
    .map(order -> new Message<>("batch_order_topic", order))
    .collect(Collectors.toList());


SendResult result = rocketMQTemplate.syncSend("batch_order_topic", messages, 3000);

2. 消費端并發配置

@RocketMQMessageListener(
    topic = "high_concurrency_topic",
    consumerGroup = "high_concurrency_group",
    consumeThreadNumber = 32,  // 消費線程數
    consumeTimeout = 15L       // 消費超時(分鐘)
)

3. 消息過濾優化 - 使用SQL表達式

@RocketMQMessageListener(
    topic = "filtered_order_topic",
    consumerGroup = "filtered_order_group",
    selectorType = SelectorType.SQL92,
    selectorExpression = "amount > 100 AND userId LIKE 'VIP%'"
)

五、部署與監控

1. RocketMQ集群部署建議

圖片圖片

2. 監控方案

  • RocketMQ控制臺:實時查看隊列情況
  • Prometheus + Grafana:監控關鍵指標

消息堆積量

發送/消費TPS

消費延遲

  • 日志監控:ELK收集分析日志

結語

異步消息隊列是構建高并發系統的基石,合理使用RocketMQ能讓你的系統在流量洪峰面前游刃有余!
責任編輯:武曉燕 來源: 小林聊編程
相關推薦

2019-03-25 08:05:35

Elasticsear優化集群

2020-09-29 07:54:05

Express 飛起

2011-04-13 10:51:58

MATLAB

2022-10-09 18:14:31

訂單系統分庫分表

2025-03-28 03:20:00

MySQL數據庫搜索

2019-11-05 10:35:57

SpringBoot調優Java

2025-06-26 02:11:00

2025-06-26 02:15:00

2025-01-17 09:23:31

2021-07-13 07:52:03

SQL面試COUNT(*)

2016-01-19 17:03:59

數據中心網絡華為

2025-04-15 00:00:00

2013-01-07 09:34:43

CodeLoveBAT

2011-02-25 08:39:11

QFabric數據中心Juniper

2024-11-27 09:46:34

2023-03-01 23:59:23

Java開發

2011-09-27 13:25:05

Web

2024-06-12 12:28:23

2011-05-20 11:12:01

數據庫DB2優化

2023-03-31 15:10:32

PythonVSCode程序員
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久99精品国产 | 精品福利一区二区三区 | 在线视频一区二区三区 | 亚洲人成人一区二区在线观看 | 亚洲欧美中文日韩在线v日本 | 欧美日日 | av电影一区| 国产精品久久久久无码av | 精品国产一区二区三区四区在线 | 一级久久久久久 | 成人免费一区二区三区视频网站 | 99re6在线视频 | 中国一级大黄大片 | 国际精品鲁一鲁一区二区小说 | 最新中文在线视频 | 日韩成人免费视频 | 欧美在线视频a | 中文福利视频 | 欧美成人激情视频 | 淫片专区| 久久99精品国产自在现线小黄鸭 | 国产在线第一页 | 国产成人精品在线 | 久草精品视频 | 美女久久久久 | 亚洲视频在线免费观看 | 玖玖国产 | 午夜小电影 | 国产精品福利视频 | 在线欧美日韩 | 中文字幕第一页在线 | 永久av| 嫩草视频在线免费观看 | 国产精品1区2区 | av中文字幕在线播放 | 免费一区二区 | 亚洲一区中文字幕在线观看 | 日韩欧美国产精品一区二区三区 | 天堂av免费观看 | 久久久久久综合 | 中文字幕三区 |