消息的發送-RocketMQ知識體系(二)
上一篇認識了一下RocketMQ,本文講講講RocketMQ生產端的那些事兒,消息的發送相關的原理。
消息發送的流程
RocketMQ 客戶端的消息發送可以分為以下三層:
業務層:直接調用 MQ Client 發送 API 的業務代碼;
消息處理層:RocketMQ Client 獲取業務發送的消息對象后,一系列的參數檢查、消息發送準備、參數包裝等操作;
通信層:RocketMQ 基于 Netty 封裝的一個 RPC 通信服務,RocketMQ 的各個組件之間的通信全部使用這個模塊;
大概的流程:
- Broker啟動時,向NameServer注冊信息
- 客戶端調用producer發送消息時,會先從NameServer獲取該topic的路由信息。消息頭code為GET_ROUTEINFO_BY_TOPIC
- 從NameServer返回的路由信息,包括topic包含的隊列列表和broker列表
- Producer端根據查詢策略,選出其中一個隊列,用于后續存儲消息
- 每條消息會生成一個唯一id,添加到消息的屬性中。屬性的key為UNIQ_KEY
- 對消息做一些特殊處理,比如:超過4M會對消息進行壓縮
- producer向Broker發送rpc請求,將消息保存到broker端。消息頭的code為SEND_MESSAGE或SEND_MESSAGE_V2(配置文件設置了特殊標志)
消息的數據結構
消息(Message)
消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬于一個主題。RocketMQ 中每個消息擁有唯一的 Message ID,且可以攜帶具有業務標識的 Key。系統提供了通過 Message ID 和 Key 查詢消息的功能。
核心字段配置
其他Message配置
Producer配置
消息發送方式
Rocketmq提供三種方式可以發送普通消息:同步、異步、和單向發送。
- 同步:發送方發送消息后,收到服務端響應后才發送下一條消息
- 異步:發送一條消息后,不等服務端返回就可以繼續發送消息或者后續任務處理。發送方通過回調接口接收服務端響應,并處理響應結果。
- OneWay:發送方發送消息,不等待服務端返回響應且沒有回調函數觸發,即只發送請求不需要應答。
發送方式對比:發送吞吐量,單向>異步>同步。但單向發送可靠性差存在丟失消息可能,選型根據實際需求確定。
2、消息類型
消息客戶端提供多種SDK:普通、順序、事務、延時消息
Producer負載均衡
producer在發送消息時,默認輪詢所有queue,消息就會被發送到不同的queue上。而queue可以分布在不同的broker上。
生產者高可用
【應用場景】
假如現在有個由三個 broker 節點組成的集群,有 topic1,默認在每個 broker 上創建 4 個隊列,分別是:master-a(q0,q1,q2,q3)、master-b(q0,q1,q2,q3)、master-c(q0,q1,q2,q3),上一次發送消息到 master-a 的 q0 隊列,此時 master-a 宕機了,如果繼續發送 topic1 消息,如果避免再次發送到 master-a?
rocketmq 的解決方案:
發送失敗重試和 Broker 故障延遲規避機制。通過配置項 retryTimesWhenSendFailed 來表示同步重試次數,默認為 2 次,加上正常發送 1 次,總共三次機會;選擇隊列的方式通過 sendLatencyFaultEnable 的值來控制,默認值為 false,不啟動 broker 故障延遲機制,值為 true 時啟用 broker 故障延遲機制。
(1)發送失敗重試
RocketMQ 支持同步、異步發送,無論哪種方法都可以在失敗后重試,如果單個 Broker 發生故障,重試會選擇其他 Broker 保證消息的正常發送。
失敗重試的邏輯:
(2)Broker 規避機制
RocketMQ Client 會維護一個“Broker-發送延遲”關系,根據這個關系選擇一個發送延遲級別較低的 Broker,這樣能最大限度地利用 Broker 的能力,剔除已經宕機、不可用或發送延遲級別較高的 Broker,盡可能保證消息正常發送。
NameServer掛了怎么辦?
如果Namesrv掛了,當新加入的生產消費則獲取不到topic路由信息會報MQExecption;如果生產消費緩存了生產者有緩存 Topic 的路由信息,如果NameServer 全部掛掉,并且,此時依然可以發送消息。