SpringBoot 集成 RocketMQ:異步消息隊列實戰,讓系統飛起來!
作者:fareboy
本文將手把手教你如何在 SpringBoot 中集成 RocketMQ,實現完整的異步消息處理流程!
引言:為什么需要異步消息隊列?
在現代高并發系統中,解耦服務、削峰填谷、異步處理已成為架構設計的核心需求。RocketMQ 作為阿里巴巴開源的分布式消息中間件,憑借其高吞吐、低延遲、高可用的特性,成為企業級應用的首選解決方案。
本文將手把手教你如何在 SpringBoot 中集成 RocketMQ,實現完整的異步消息處理流程!
一、環境準備與項目搭建
1. 技術棧
- JDK 1.8+
- SpringBoot 2.7.x
- RocketMQ 4.9.3
- RocketMQ-Spring-Boot-Starter 2.2.2
2. 添加依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
二、核心實現:生產者與消費者
1. 配置文件 (application.yml)
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ NameServer地址
producer:
group: order_producer_group # 生產者組名
send-message-timeout: 3000 # 發送超時時間(ms)
2. 消息生產者服務
@Service
@RequiredArgsConstructor
public class OrderProducer {
private final RocketMQTemplate rocketMQTemplate;
// 發送普通消息
public void sendOrderMessage(Order order) {
Message<Order> message = MessageBuilder.withPayload(order)
.setHeader(RocketMQHeaders.KEYS, order.getOrderId())
.build();
rocketMQTemplate.send("order_topic", message);
log.info("訂單消息已發送: {}", order);
}
// 發送延遲消息(30秒后消費)
public void sendDelayMessage(Order order) {
rocketMQTemplate.syncSend("delay_order_topic",
MessageBuilder.withPayload(order).build(),
2000, // 發送超時
3 // 延遲級別(對應30秒)
);
log.info("延遲訂單消息已發送: {}", order);
}
}
3. 消息消費者服務
@Slf4j
@Service
@RocketMQMessageListener(
topic = "order_topic",
consumerGroup = "order_consumer_group",
selectorType = SelectorType.TAG,
selectorExpression = "normal || vip" // 過濾標簽
)
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到訂單消息,開始處理: {}", order);
// 業務處理邏輯...
processOrder(order);
}
private void processOrder(Order order) {
// 模擬業務處理
log.info("訂單處理完成: {}", order.getOrderId());
}
}
4. 訂單實體類
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order implements Serializable {
private String orderId;
private BigDecimal amount;
private LocalDateTime createTime;
private String userId;
private String orderType; // 用于消息過濾
}
三、高級特性實現
1. 事務消息處理(解決分布式事務)
@Slf4j
@Service
@RocketMQTransactionListener
public class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
// 執行本地事務
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) msg.getPayload();
orderService.createOrder(order); // 本地數據庫操作
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事務執行失敗", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
// 本地事務狀態回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = msg.getHeaders().get("orderId").toString();
return orderService.checkOrderExists(orderId) ?
RocketMQLocalTransactionState.COMMIT :
RocketMQLocalTransactionState.ROLLBACK;
}
}
2. 消息重試與死信隊列
// 消費者配置重試策略
@RocketMQMessageListener(
topic = "important_order_topic",
consumerGroup = "important_order_group",
maxReconsumeTimes = 3 // 最大重試次數
)
public class ImportantOrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
try {
processImportantOrder(order);
} catch (Exception e) {
throw new RuntimeException("處理失敗,觸發重試");
}
}
// 達到最大重試次數后,消息進入死信隊列
// 死信隊列命名: %DLQ%consumerGroup
}
四、性能優化技巧
1. 批量消息發送 - 提升吞吐量
List<Message<Order>> messages = orders.stream()
.map(order -> new Message<>("batch_order_topic", order))
.collect(Collectors.toList());
SendResult result = rocketMQTemplate.syncSend("batch_order_topic", messages, 3000);
2. 消費端并發配置
@RocketMQMessageListener(
topic = "high_concurrency_topic",
consumerGroup = "high_concurrency_group",
consumeThreadNumber = 32, // 消費線程數
consumeTimeout = 15L // 消費超時(分鐘)
)
3. 消息過濾優化 - 使用SQL表達式
@RocketMQMessageListener(
topic = "filtered_order_topic",
consumerGroup = "filtered_order_group",
selectorType = SelectorType.SQL92,
selectorExpression = "amount > 100 AND userId LIKE 'VIP%'"
)
五、部署與監控
1. RocketMQ集群部署建議
圖片
2. 監控方案
- RocketMQ控制臺:實時查看隊列情況
- Prometheus + Grafana:監控關鍵指標
消息堆積量
發送/消費TPS
消費延遲
- 日志監控:ELK收集分析日志
結語
責任編輯:武曉燕
來源:
小林聊編程