我怎么不知道RocketMQ生產者有這么多用法?
本文轉載自微信公眾號「大魚仙人」,作者大魚。轉載本文請聯系大魚仙人公眾號。
前言
消息隊列RocketMQ版是阿里云基于Apache RocketMQ構建的低延遲、高并發、高可用、高可靠的分布式消息中間件。
看過我之前幾篇文章的應該都大概隊消息隊列有個概念了,都明白了,那這個消息從何而來呢?
所謂黃河之水天上來,大自然間每一個事物都不是平白無故來的吧?????怎么來的,????它母親生產的;香奈兒????怎么來的,機器加原料生產的;就連平時吃的大米,也是有出處的;咱們是怎么來的,咱們當然是偉大的母親生產下來的了
順便感謝一下偉大的母親,周日記得給她打個電話哦
下面進入主題,這是分割線
消息隊列RocketMQ版既可為分布式應用系統提供異步解耦和削峰填谷的能力,同時也具備互聯網應用所需的海量消息堆積、高吞吐、可靠重試等特性。下面列舉了一些特點
- 消息查詢:消息隊列RocketMQ版提供了三種消息查詢的方式,分別是按Message ID、Message Key以及Topic查詢
- 查詢消息軌跡:通過消息軌跡,能清晰定位消息從生產者發出,經由消息隊列RocketMQ版服務端,投遞給消息消費者的完整鏈路,方便定位排查問題
- 集群消費和廣播消費:當使用集群消費模式時,消息隊列RocketMQ版認為任意一條消息只需要被消費者集群內的任意一個消費者處理即可;當使用廣播消費模式時,消息隊列RocketMQ版會將每條消息推送給消費者集群內所有注冊過的消費者,保證消息至少被每臺機器消費一次
- 重置消費位點:根據時間或位點重置消費進度,允許用戶進行消息回溯或者丟棄堆積消息
- 死信隊列:將無法正常消費的消息儲存到特殊的死信隊列供后續處理
- 全球信息路由:用于全球不同地域之間的消息同步,保證地域之間的數據一致性
客戶端,其實很容易理解了,我們可以把RocketMQ理解成一個消息服務,既然是一個服務,我們就需要調用這個服務,那么調用這個服務的時候,這個消息從哪里來,這個就是要根據業務場景來定了,所以啊,消息的生產者Producer屬于一個客戶端;消息產生了,總不能一直放著吧,總要有人處理掉這些消息吧,這也是業務決定的,所以消息的消費者consumer也是屬于客戶端。
下面啊,大魚就帶著大家一起來看看這客戶端的用處
生產者Producer
生產者Producer,顧名思義,就是負責生產消息的,此時大家應該腦子有很多問號才對,比如Producer發消息發到哪里了,流程是怎么樣的,發的消息都是什么類型的等等這些,這些問題搞懂了的話,Producer這個客戶端基本就搞定了
魚魚教大家一個小技巧,學習一個東西,先搞懂大體流程,再拆分而細攻之,最后再統籌理解,這樣效果會很好,獨家秘方
接下來我從消息是如何發送的(負載均衡、容錯機制)、消息發給誰和存儲到哪里、消息的類型三方面來介紹Producer
1、消息是如何發送的?
首先,消息總不能產生了哪里也不去吧,那產生這個消息就沒有任何意義了,所以這個消息總要發送到一個地方去,接力傳遞,看下面這個圖
Producer會首先從本地緩存中獲取到指定的Topic,如果找到就直接根據這個Topic發送產生的消息,緩存大家都明白啊,就是為了優化速度,減少網絡傳輸。
沒有的話,就要去NameServer獲取最新的Topic列表(這個是Broker啟動的時候注冊到NameServer上的),通過一定的策略選擇一個MessageQueue隊列,獲取這個mq所在的Broker地址,也是先從本地緩存中獲取,如果獲取不到則請求NameServer獲取(NameServer中也同樣注冊了Broker地址和Topic的映射關系),進行發送消息
發送失敗的話,會有重試機制,默認是重試三次
其實保存這么多,既能減少和NameServer之間的網絡傳輸,又能減小NameServer的壓力,NameServer本身就是屬于輕量級的設計,這樣也有利于減輕NameServer的壓力,NameServer我也會單獨寫一篇來介紹
負載均衡
我們知道消息發送的時候會首先選擇一個對應的Topic,每個Topic會對應多個MessageQueue,這樣就有一個問題,發消息的時候要是做不到雨露均沾,可能就會有的隊列多,有的隊列少這樣的問題,就會造成資源的浪費
RocketMQ采用了樸素的方式,沒錯,就是輪詢,高端的食材往往只需要最樸素的烹飪方式~
生產者通過輪詢某個 Topic 下的所有 MessageQueue 的方式來實現發送方的負載均衡,簡單來說就是人人都有份,如下圖:
通過這種方式,可以將一個 Topic 的消息分散到多個 MessageQueue 上,進而分散到多個 Broker 上。
發送消息的容錯機制:
Producer 作為發送消息的一方,有3種容錯機制:
- 本地緩存:把從 NameSever 獲取的信息緩存到本地,以防 NameSever 宕機
- 不可用Broker集合:Producer有一個 Broker 的容錯機制,開關sendLatencyFaultEnable可以開啟,RocketMq內部會維護一個故障Broker的HashMap,把一定延遲級別的Broker放入這個map,下次選擇Broker的時候,就會規避不可用的Broker。
- 重試:Producer發送消息時,有一個重試機制,默認重試3次。死信隊列 Consumer消費重試超過指定次數,進入死信隊列
通過這種方式,可以將一個 Topic 的消息分散到多個 MessageQueue 上,進而分散到多個 Broker 上。
2、消息發給誰和存儲在哪里?
Producer連接NameSever
Producer 通過 NameSever 獲取指定 Topic 的 Broker 路由信息,并在本地保存一份緩存數據,比如一個Topic有哪些 MessageQueue,MessageQueue 在哪幾臺 Broker 上,Broker 的ip.port等等。Producer 發送消息只發到 Master Broker上,Slave 通過主從同步獲取數據。
那么 Produce 是怎么連接NameSever 的呢
- 連接:單個生產者者和一臺 Nameserver 保持長連接,定時查詢topic配置信息,如果該nameserver掛掉,生產者會自動連接下一個nameserver,直到有可用連接為止,并能自動重連。
- 輪詢時間:默認情況下,生產者每隔30秒從nameserver獲取所有topic的最新隊列情況,這意味著某個broker如果宕機,生產者最多要30秒才能感知,在此期間,發往該broker的消息發送失敗。該時間由DefaultMQProducer的pollNameServerInteval參數決定,可手動配置。
- 心跳:與nameserver沒有心跳
Producer連接Broker
- 連接:生產者 跟 Topic 涉及的所有Broker 保持長連接。
- 心跳:默認情況下,生產者每隔30秒向所有broker發送心跳。broker每隔10秒鐘(此時間無法更改),掃描所有還存活的連接,若某個連接2分鐘內(當前時間與最后更新時間差值超過2分鐘,此時間無法更改)沒有發送心跳數據,則關閉連接
Producer連接上Broker之后,消息會通過輪詢的方式發送到Broker上,并且存儲在Broker中的CommitLog中,這里面存儲的是原始消息,還有一個ConsumeQueue用于存儲投遞到某一個queue的消息的位置信息。當然,消息隊列會持久化到磁盤中的,不影響內存,當然也會定期清理消息。
那消費完的消息去了哪里呢?什么時候清理物理消息文件呢?還有這樣設計的好處呢?
這些我們都留在下下一篇中,也就是Broker篇,讓你透徹了解Broker這個大腦是如何助力RocketMQ支持這么高的吞吐量的
總之啊,這個問題值得大家深入研究一下,如果再面試的時候,你不僅能說出RocketMQ的用處,你還能說出它的存儲原理和尋址原理,那面試官就愛上你了。此時你再拿出王炸,就是解決各種實際問題的能力,比如如何處理重復消息啊、如何保證消息的順序性啊、在分布式系統中如何保證分布式事務啊
面試官當場給你發offer,say:How much money do you expect to work for us ?
3、消息的種類
RocketMQ種的消息種類大致可以分為四種:普通消息、定時和延時消息、順序消息、事務消息四種類型,這是重點!
簡單介紹下四種類型
- 普通消息:消息隊列RocketMQ版中無特性的消息,區別于有特性的定時和延時消息、順序消息和事務消息。
- 定時和延時消息:允許消息生產者對指定消息進行定時(延時)投遞,最長支持40天。
- 順序消息:允許消息消費者按照消息發送的順序對消息進行消費。
- 事務消息:實現類似X或Open XA的分布事務功能,以達到事務最終一致性狀態。
消息隊列RocketMQ提供的四種消息類型所對應的Topic不能混用,例如,創建的普通消息的Topic只能用于收發普通消息,不能用于收發其他類型的消息;同理,事務消息的Topic也只能收發事務消息,不能用于收發其他類型的消息,以此類推
普通消息
普通消息:消息隊列RocketMQ中無特性的消息,區別于有特性的定時和延時消息、順序消息和事務消息
普通消息以三種發送方式:同步Sync發送、異步Async發送和單向Oneway發送
同步就是我們發送了消息之后必須等到服務器響應之后才能發送下一個;異步適用于對時間較敏感的業務場景,異步不需要等待服務器的響應就可以連續發送消息;單向則比異步用時更短,一般在微秒級別,但是可靠性會降低,因為只管發送,不等待服務器響應,也沒有回調函數觸發
同步發送
同步,消息發送方發出一條消息后,會在收到服務端返回響應之后才發下一條消息的通訊方式
異步發送
異步發送是指發送方發出一條消息后,不等服務端返回響應,接著發送下一條消息的通訊方式
消息隊列RocketMQ版的異步發送,需要實現異步發送回調接口(SendCallback)。消息發送方在發送了一條消息后,不需要等待服務端響應即可發送第二條消息。發送方通過回調接口接收服務端響應,并處理響應結果
一般用于對時間較敏感的業務場景
單向發送
發送方只負責發送消息,不等待服務端返回響應且沒有回調函數觸發,即只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別
應用于對可靠性要求并不高的場景,比如日志收集
定時和延時消息
定時和延時消息:允許消息生產者對指定消息進行定時(延時)投遞,最長支持40天
延時消息用于指定消息發送到消息隊列RocketMQ版的服務端后,延時一段時間才被投遞到客戶端進行消費(例如3秒后才被消費),適用于解決一些消息生產和消費有時間窗口要求的場景,或者通過消息觸發延遲任務的場景,類似于延遲隊列。
定時消息可以做到在指定時間戳之后才可被消費者消費,適用于對消息生產和消費有時間窗口要求,或者利用消息觸發定時任務的場景。
適用場景
通過消息來觸發一些定時任務,這個時候這個定時消息就派上用場了,比如在某一時間向用戶發送的提醒消息;一些消息生產和消費之間有時間窗口,比如典型的電商里面的超時未支付關閉訂單的場景,這時延時消息就派上用場了,超時未完成支付就關閉訂單
定時消息的精度會有1s~2s的延遲誤差
其實定時消息和延時消息在使用的時候也是有一些差別的,用過的應該都知道,給大家提一下,定時消息需要明確指定消息發送時間點之后的某一時間點作為消息投遞的時間點;延時消息則需要設定一個延時的時間長度,長度是固定的,但是時刻點不是固定,是根據發送消息的時間點有關的,消息將從當前發送時間點開始延遲固定時間之后才開始投遞,這個大家應該都很清楚了,淘寶下個單,給你留30分鐘時間支付,超時未支付則關閉訂單
順序消息
順序消息:允許消息消費者按照消息發送的順序進行消息的發送
順序消息分為兩類:
- 全局順序:對于指定的一個Topic,所有消息按照嚴格的先入先出FIFO(First In First Out)的順序進行發布和消費。
- 分區順序:對于指定的一個Topic,所有消息根據Sharding Key進行區塊分區。同一個分區內的消息按照嚴格的FIFO順序進行發布和消費。Sharding Key是順序消息中用來區分不同分區的關鍵字段,和普通消息的Key是完全不同的概念。
其實這也是個比較經典的問題,面試也是比較常問的,就是如何保證順序性?魚魚反正會回答,你會嗎?
如果遇到這個問題,首先你要分情況說明,就是分為全局順序和分區順序這兩種情況:
1、全局順序適用于性能要求不高,所有的消息都要嚴格按照先進先出的順序來發布和消費的場景。這種情況我也沒遇到過,一般也不太會使用全局有序這種
2、分區順序適用于性能要求比較高,以Sharding Key作為分區字段,在用一個區塊中嚴格按照先進先出的順序發布和消費。比如用戶注冊的時候的驗證碼,以用戶ID作為Sharding Key,那么同一個用戶發送的消息都會按照發布的先后順序來消費,再比如就是電商中的訂單流程問題
阿里巴巴集團內部電商系統均使用分區順序消息,既保證業務的順序,同時又能保證業務的高性能。別問我怎么知道的,阿里云官網寫的
順序消息常見問題
為什么全局順序消息性能一般?
全局順序消息是嚴格按照FIFO的消息阻塞原則,即上一條消息沒有被成功消費,那么下一條消息會一直被存儲到Topic隊列中。如果想提高全局順序消息的TPS,可以升級實例配置,同時消息客戶端應用盡量減少處理本地業務邏輯的耗時。
順序消息支持哪種消息發送方式?是否支持集群消費和廣播消費?
順序消息只支持可靠同步發送方式,不支持異步發送方式,否則將無法嚴格保證順序。順序消息暫時僅支持集群消費模式,不支持廣播消費模式。
事務消息
事務消息:實現類似X或者Open XA的分布式事務功能,以達到最終一致性
消息隊列RocketMQ版提供類似X或Open XA的分布式事務功能,通過消息隊列RocketMQ版事務消息,能達到分布式事務的最終一致。
半事務消息:暫不能投遞的消息,發送方已經成功地將消息發送到了消息隊列RocketMQ版服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處于該種狀態下的消息即半事務消息。
消息回查:由于網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,消息隊列RocketMQ版服務端通過掃描發現某條消息長期處于“半事務消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit或是Rollback),該詢問過程即消息回查。
跟小仙來看看事務消息發送步驟:
1、發送方將半事務消息發送到服務端Broker,服務端會將消息持久化,成功之后會返回ACK確認消息已經發送成功,此時消息為半事務消息
2、發送方開始執行本地事務的邏輯
3、發送方會根據本地事務的執行結果向服務端提交二次確認,決定Commit還是Rollback,服務端收到Commit之后則把這個消息標記為可投遞,發送到消費方;服務端收到Rollback之后則刪除半事務消息,服務端不會發送,則消費方也不會收到
如可是如果斷網或者應用重啟這些情況,上述的步驟的二次確認信息無法到達服務端,怎么辦?
這里其實有個回查機制,發送方發送消息之后,需要本地執行事務,如果事務執行的過程出現卡死的情況,或者事務執行結果因為網絡等問題,無法傳遞事務結果到服務端,服務端會執行一個回查機制,來確認這個半事務消息的最終提交情況。
總結
消息隊列RocketMQ版的消費者和生產者客戶端對象是線程安全的,可以在多個線程之間共享使用。可以在服務器上(或者多臺服務器)部署多個生產者和消費者實例,也可以在同一個生產者或消費者實例里采用多線程發送或接收消息,從而提高消息發送或接收TPS。避免為每個線程創建一個客戶端實例。
好了,回顧一下本篇的內容吧
1、消息發送的負載均衡、容錯機制
2、消息發送流程和存儲(具體如何存儲會在Broker篇說,因為這些東西都存儲在Broker的CommitLog和ConsumerQueue中了)
3、消息的類型:普通消息(同步發送、異步發送、單向發送)、定時和延時消息、順序消息(全局順序和部分順序)、事務消息