SpringBoot整合RocketMQ入門示例
環境:springboot2.3.9 + RocketMQ4.8.0
RocketMQ機構及概念
1 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。Broker 在實際部署過程中對應一臺服務器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于不同的 Broker。Message Queue 用于存儲消息的物理地址,每個Topic中的消息地址存儲于多個 Message Queue 中。ConsumerGroup 由多個Consumer 實例構成。
2 消息生產者(Producer)
負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。
3 消息消費者(Consumer)
負責消費消息,一般是后臺系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、并將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
4 主題(Topic)
表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。
5 代理服務器(Broker Server)
消息中轉角色,負責存儲消息、轉發消息。代理服務器在RocketMQ系統中負責接收從生產者發送來的消息并存儲、同時為消費者的拉取請求作準備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。如下圖:
6 名字服務(Name Server)
名稱服務充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。多個Namesrv實例組成集群,但相互獨立,沒有信息交換。
7 拉取式消費(Pull Consumer)
Consumer消費的一種類型,應用通常主動調用Consumer的拉消息方法從Broker服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。
8 推動式消費(Push Consumer)
Consumer消費的一種類型,該模式下Broker收到數據后會主動推送給消費端,該消費模式一般實時性較高。
9 生產者組(Producer Group)
同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則Broker服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。
10 消費者組(Consumer Group)
同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。
11 集群消費(Clustering)
集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。
12 廣播消費(Broadcasting)
廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
13 普通順序消息(Normal Ordered Message)
普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。
14 嚴格順序消息(Strictly Ordered Message)
嚴格順序消息模式下,消費者收到的所有消息均是有順序的。
15 消息(Message)
消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬于一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。
16 標簽(Tag)
為消息設置的標志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。
ActiveMQ,Kafka,RocketMQ對比:
RocketMQ服務
1 下載RocketMQ
2 配置環境變量
3 啟動Name Server
4 啟動 Broker
5 通過命令行發送 & 接收消息
設置環境變量:
- C:\Users\MSI-NB>set NAMESRV_ADDR=localhost:9876
發送消息:
- C:\Users\MSI-NB>tools org.apache.rocketmq.example.quickstart.Producer
接收消息:
SpringBoot整合RocketMQ入門
依賴:
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.0</version>
- </dependency>
配置文件:
- rocketmq:
- nameServer: localhost:9876
- producer:
- group: demo-mq
生產者:
- @Service
- public class ProducerService {
- @Resource
- private RocketMQTemplate rocketMQTemplate ;
- public void send(String message) {
- rocketMQTemplate.convertAndSend("test-topic", message);
- }
- }
消費者:
- @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group")
- @Component
- public class ConsumerListener implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("接收到消息:" + message) ;
- }
- }
這里的topic要和發送端設置的一致,consumerGroup可隨意。
發送接口:
- @RestController
- @RequestMapping("/messages")
- public class MessageController {
- @Resource
- private ProducerService ps ;
- @GetMapping("")
- public Object send(String message) {
- ps.send(message) ;
- return "send success" ;
- }
- }
測試:
發送消息指定tags
發送時:
- rocketMQTemplate.convertAndSend("test-topic:tag1", message);
接收時:
- @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1")
selectorExpression:默認是 “*” ,這里指定與發送的一致;
這里看下源碼:
RocketMQUtil.java
這里topic與tags是用冒號 ":" 分割的,tags就是取的數組的第二個。