一口氣說(shuō)出六種實(shí)現(xiàn)延時(shí)消息的方案
前言
延時(shí)消息(定時(shí)消息)指的在分布式異步消息場(chǎng)景下,生產(chǎn)端發(fā)送一條消息,希望在指定延時(shí)或者指定時(shí)間點(diǎn)被消費(fèi)端消費(fèi)到,而不是立刻被消費(fèi)。
延時(shí)消息適用的業(yè)務(wù)場(chǎng)景非常的廣泛,在分布式系統(tǒng)環(huán)境下,延時(shí)消息的功能一般會(huì)在下沉到中間件層,通常是 MQ 中內(nèi)置這個(gè)功能或者內(nèi)聚成一個(gè)公共基礎(chǔ)服務(wù)。
本文旨在探討常見(jiàn)延時(shí)消息的實(shí)現(xiàn)方案以及方案設(shè)計(jì)的優(yōu)缺點(diǎn)。
實(shí)現(xiàn)方案
1.基于外部存儲(chǔ)實(shí)現(xiàn)的方案
這里討論的外部存儲(chǔ)指的是在 MQ 本身自帶的存儲(chǔ)以外又引入的其他的存儲(chǔ)系統(tǒng)。
基于外部存儲(chǔ)的方案本質(zhì)上都是一個(gè)套路,將 MQ 和 延時(shí)模塊 區(qū)分開(kāi)來(lái),延時(shí)消息模塊是一個(gè)獨(dú)立的服務(wù)/進(jìn)程。延時(shí)消息先保留到其他存儲(chǔ)介質(zhì)中,然后在消息到期時(shí)再投遞到 MQ。
當(dāng)然還有一些細(xì)節(jié)性的設(shè)計(jì),比如消息進(jìn)入的延時(shí)消息模塊時(shí)已經(jīng)到期則直接投遞這類(lèi)的邏輯,這里不展開(kāi)討論。
下述方案不同的是,采用了不同的存儲(chǔ)系統(tǒng)。
基于 數(shù)據(jù)庫(kù)(如MySQL)
基于關(guān)系型數(shù)據(jù)庫(kù)(如MySQL)延時(shí)消息表的方式來(lái)實(shí)現(xiàn)。
CREATE TABLE `delay_msg` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`delivery_time` DATETIME NOT NULL COMMENT '投遞時(shí)間',
`payloads` blob COMMENT '消息內(nèi)容',
PRIMARY KEY (`id`),
KEY `time_index` (`delivery_time`)
)
通過(guò)定時(shí)線程定時(shí)掃描到期的消息,然后進(jìn)行投遞。定時(shí)線程的掃描間隔理論上就是你延時(shí)消息的最小時(shí)間精度。
優(yōu)點(diǎn):
- 實(shí)現(xiàn)簡(jiǎn)單;
缺點(diǎn):
- B+Tree索引不適合消息場(chǎng)景的大量寫(xiě)入;
基于 RocksDB
RocksDB 的方案其實(shí)就是在上述方案上選擇了比較合適的存儲(chǔ)介質(zhì)。
RocksDB 在筆者之前的文章中有聊過(guò),LSM 樹(shù)更適合大量寫(xiě)入的場(chǎng)景。滴滴開(kāi)源的DDMQ中的延時(shí)消息模塊 Chronos 就是采用了這個(gè)方案。
DDMQ 這個(gè)項(xiàng)目簡(jiǎn)單來(lái)說(shuō)就是在 RocketMQ 外面加了一層統(tǒng)一的代理層,在這個(gè)代理層就可以做一些功能維度的擴(kuò)展。延時(shí)消息的邏輯就是代理層實(shí)現(xiàn)了對(duì)延時(shí)消息的轉(zhuǎn)發(fā),如果是延時(shí)消息,會(huì)先投遞到 RocketMQ 中 Chronos 專(zhuān)用的 topic 中。
延時(shí)消息模塊 Chronos 消費(fèi)得到延時(shí)消息轉(zhuǎn)儲(chǔ)到 RocksDB,后面就是類(lèi)似的邏輯了,定時(shí)掃描到期的消息,然后往 RocketMQ 中投遞。
這個(gè)方案老實(shí)說(shuō)是一個(gè)比較重的方案。因?yàn)榛?RocksDB 來(lái)實(shí)現(xiàn)的話,從數(shù)據(jù)可用性的角度考慮,你還需要自己去處理多副本的數(shù)據(jù)同步等邏輯。
優(yōu)點(diǎn):
- RocksDB LSM 樹(shù)很適合消息場(chǎng)景的大量寫(xiě)入;
缺點(diǎn):
- 實(shí)現(xiàn)方案較重,如果你采用這個(gè)方案,需要自己實(shí)現(xiàn) RocksDB 的數(shù)據(jù)容災(zāi)邏輯;
基于 Redis
再來(lái)聊聊 Redis 的方案。下面放一個(gè)比較完善的方案。
- Messages Pool 所有的延時(shí)消息存放,結(jié)構(gòu)為KV結(jié)構(gòu),key為消息ID,value為一個(gè)具體的message(這里選擇Redis Hash結(jié)構(gòu)主要是因?yàn)閔ash結(jié)構(gòu)能存儲(chǔ)較大的數(shù)據(jù)量,數(shù)據(jù)較多時(shí)候會(huì)進(jìn)行漸進(jìn)式rehash擴(kuò)容,并且對(duì)于HSET和HGET命令來(lái)說(shuō)時(shí)間復(fù)雜度都是O(1))
- Delayed Queue是16個(gè)有序隊(duì)列(隊(duì)列支持水平擴(kuò)展),結(jié)構(gòu)為ZSET,value 為 messages pool中消息ID,score為過(guò)期時(shí)間(分為多個(gè)隊(duì)列是為了提高掃描的速度)
- Worker 代表處理線程,通過(guò)定時(shí)任務(wù)掃描 Delayed Queue 中到期的消息
這個(gè)方案選用 Redis 存儲(chǔ)在我看來(lái)有幾點(diǎn)考慮,
- Redis ZSET 很適合實(shí)現(xiàn)延時(shí)隊(duì)列
- 性能問(wèn)題,雖然 ZSET 插入是一個(gè) O(logn) 的操作,但是Redis 基于內(nèi)存操作,并且內(nèi)部做了很多性能方面的優(yōu)化。
但是這個(gè)方案其實(shí)也有需要斟酌的地方,上述方案通過(guò)創(chuàng)建多個(gè) Delayed Queue 來(lái)滿足對(duì)于并發(fā)性能的要求,但這也帶來(lái)了多個(gè) Delayed Queue 如何在多個(gè)節(jié)點(diǎn)情況下均勻分配,并且很可能出現(xiàn)到期消息并發(fā)重復(fù)處理的情況,是否要引入分布式鎖之類(lèi)的并發(fā)控制設(shè)計(jì)?
在量不大的場(chǎng)景下,上述方案的架構(gòu)其實(shí)可以蛻化成主從架構(gòu),只允許主節(jié)點(diǎn)來(lái)處理任務(wù),從節(jié)點(diǎn)只做容災(zāi)備份。實(shí)現(xiàn)難度更低更可控。
定時(shí)線程檢查的缺陷與改進(jìn)
上述幾個(gè)方案中,都通過(guò)線程定時(shí)掃描的方案來(lái)獲取到期的消息。
定時(shí)線程的方案在消息量較少的時(shí)候,會(huì)浪費(fèi)資源,在消息量非常多的時(shí)候,又會(huì)出現(xiàn)因?yàn)閽呙栝g隔設(shè)置不合理導(dǎo)致延時(shí)時(shí)間不準(zhǔn)確的問(wèn)題。可以借助 JDK Timer 類(lèi)中的思想,通過(guò) wait-notify 來(lái)節(jié)省 CPU 資源。
獲取中最近的延時(shí)消息,然后wait(執(zhí)行時(shí)間-當(dāng)前時(shí)間),這樣就不需要浪費(fèi)資源到達(dá)時(shí)間時(shí)會(huì)自動(dòng)響應(yīng),如果有新的消息進(jìn)入,并且比我們等待的消息還要小,那么直接notify喚醒,重新獲取這個(gè)更小的消息,然后又wait,如此循環(huán)。
2. 開(kāi)源 MQ 中的實(shí)現(xiàn)方案
再來(lái)講講目前自帶延時(shí)消息功能的開(kāi)源MQ,它們是如何實(shí)現(xiàn)的
RocketMQ
RocketMQ 開(kāi)源版本支持延時(shí)消息,但是只支持 18 個(gè) Level 的延時(shí),并不支持任意時(shí)間。只不過(guò)這個(gè) Level 在 RocketMQ 中可以自定義的,所幸來(lái)說(shuō)對(duì)普通業(yè)務(wù)算是夠用的。默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個(gè)level。
通俗的講,設(shè)定了延時(shí) Level 的消息會(huì)被暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據(jù) level 存入特定的queue,queueId = delayTimeLevel – 1,即一個(gè)queue只存相同延時(shí)的消息,保證具有相同發(fā)送延時(shí)的消息能夠順序消費(fèi)。 broker會(huì)調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX,將消息寫(xiě)入真實(shí)的topic。
下面是整個(gè)實(shí)現(xiàn)方案的示意圖,紅色代表投遞延時(shí)消息,紫色代表定時(shí)調(diào)度到期的延時(shí)消息:
優(yōu)點(diǎn):
- Level 數(shù)固定,每個(gè) Level 有自己的定時(shí)器,開(kāi)銷(xiāo)不大
- 將 Level 相同的消息放入到同一個(gè) Queue 中,保證了同一 Level 消息的順序性;不同 Level 放到不同的 Queue 中,保證了投遞的時(shí)間準(zhǔn)確性;
- 通過(guò)只支持固定的Level,將不同延時(shí)消息的排序變成了固定Level Topic 的追加寫(xiě)操作
缺點(diǎn):
- Level 配置的修改代價(jià)太大,固定 Level 不靈活
- CommitLog 會(huì)因?yàn)檠訒r(shí)消息的存在變得很大
Pulsar
Pulsar 支持“任意時(shí)間”的延時(shí)消息,但實(shí)現(xiàn)方式和 RocketMQ 不同。
通俗的講,Pulsar 的延時(shí)消息會(huì)直接進(jìn)入到客戶端發(fā)送指定的 Topic 中,然后在堆外內(nèi)存中創(chuàng)建一個(gè)基于時(shí)間的優(yōu)先級(jí)隊(duì)列,來(lái)維護(hù)延時(shí)消息的索引信息。延時(shí)時(shí)間最短的會(huì)放在頭上,時(shí)間越長(zhǎng)越靠后。在進(jìn)行消費(fèi)邏輯時(shí)候,再判斷是否有到期需要投遞的消息,如果有就從隊(duì)列里面拿出,根據(jù)延時(shí)消息的索引查詢到對(duì)應(yīng)的消息進(jìn)行消費(fèi)。
如果節(jié)點(diǎn)崩潰,在這個(gè) broker 節(jié)點(diǎn)上的 Topics 會(huì)轉(zhuǎn)移到其他可用的 broker 上,上面提到的這個(gè)優(yōu)先級(jí)隊(duì)列也會(huì)被重建。
下面是 Pulsar 公眾號(hào)中對(duì)于 Pulsar 延時(shí)消息的示意圖。
乍一看會(huì)覺(jué)得這個(gè)方案其實(shí)非常簡(jiǎn)單,還能支持任意時(shí)間的消息。但是這個(gè)方案有幾個(gè)比較大的問(wèn)題
- 內(nèi)存開(kāi)銷(xiāo): 維護(hù)延時(shí)消息索引的隊(duì)列是放在堆外內(nèi)存中的,并且這個(gè)隊(duì)列是以訂閱組(Kafka中的消費(fèi)組)為維度的,比如你這個(gè) Topic 有 N 個(gè)訂閱組,那么如果你這個(gè) Topic 使用了延時(shí)消息,就會(huì)創(chuàng)建 N 個(gè) 隊(duì)列;并且隨著延時(shí)消息的增多,時(shí)間跨度的增加,每個(gè)隊(duì)列的內(nèi)存占用也會(huì)上升。(是的,在這個(gè)方案下,支持任意的延時(shí)消息反而有可能讓這個(gè)缺陷更嚴(yán)重)
- 故障轉(zhuǎn)移之后延時(shí)消息索引隊(duì)列的重建時(shí)間開(kāi)銷(xiāo): 對(duì)于跨度時(shí)間長(zhǎng)的大規(guī)模延時(shí)消息,重建時(shí)間可能會(huì)到小時(shí)級(jí)別。(摘自 Pulsar 官方公眾號(hào)文章)
- 存儲(chǔ)開(kāi)銷(xiāo): 延時(shí)消息的時(shí)間跨度會(huì)影響到 Pulsar 中已經(jīng)消費(fèi)的消息數(shù)據(jù)的空間回收。打個(gè)比方,你的 Topic 如果業(yè)務(wù)上要求支持一個(gè)月跨度的延時(shí)消息,然后你發(fā)了一個(gè)延時(shí)一個(gè)月的消息,那么你這個(gè) Topic 中底層的存儲(chǔ)就會(huì)保留整整一個(gè)月的消息數(shù)據(jù),即使這一個(gè)月中99%的正常消息都已經(jīng)消費(fèi)了。
對(duì)于前面第一點(diǎn)和第二點(diǎn)的問(wèn)題,社區(qū)也設(shè)計(jì)了解決方案,在隊(duì)列中加入時(shí)間分區(qū),Broker 只加載當(dāng)前較近的時(shí)間片的隊(duì)列到內(nèi)存,其余時(shí)間片分區(qū)持久化磁盤(pán),示例圖如下圖所示:
但是目前,這個(gè)方案并沒(méi)有對(duì)應(yīng)的實(shí)現(xiàn)版本。可以在實(shí)際使用時(shí),規(guī)定只能使用較小時(shí)間跨度的延時(shí)消息,來(lái)減少前兩點(diǎn)缺陷的影響。另外,因?yàn)閮?nèi)存中存的并不是延時(shí)消息的全量數(shù)據(jù),只是索引,所以可能要積壓上百萬(wàn)條延時(shí)消息才可能對(duì)內(nèi)存造成顯著影響,從這個(gè)角度來(lái)看,官方暫時(shí)沒(méi)有完善前兩個(gè)問(wèn)題也可以理解了。
至于第三個(gè)問(wèn)題,估計(jì)是比較難解決的,需要在數(shù)據(jù)存儲(chǔ)層將延時(shí)消息和正常消息區(qū)分開(kāi)來(lái),單獨(dú)存儲(chǔ)延時(shí)消息。
QMQ
QMQ提供任意時(shí)間的延時(shí)/定時(shí)消息,你可以指定消息在未來(lái)兩年內(nèi)(可配置)任意時(shí)間內(nèi)投遞。
把 QMQ 放到最后,是因?yàn)槲矣X(jué)得 QMQ 是目前開(kāi)源 MQ 中延時(shí)消息設(shè)計(jì)最合理的。里面設(shè)計(jì)的核心簡(jiǎn)單來(lái)說(shuō)就是 多級(jí)時(shí)間輪 + 延時(shí)加載 + 延時(shí)消息單獨(dú)磁盤(pán)存儲(chǔ)。
QMQ的延時(shí)/定時(shí)消息使用的是兩層 hash wheel 來(lái)實(shí)現(xiàn)的。第一層位于磁盤(pán)上,每個(gè)小時(shí)為一個(gè)刻度(默認(rèn)為一個(gè)小時(shí)一個(gè)刻度,可以根據(jù)實(shí)際情況在配置里進(jìn)行調(diào)整),每個(gè)刻度會(huì)生成一個(gè)日志文件(schedule log),因?yàn)镼MQ支持兩年內(nèi)的延時(shí)消息(默認(rèn)支持兩年內(nèi),可以進(jìn)行配置修改),則最多會(huì)生成 2 * 366 * 24 = 17568 個(gè)文件(如果需要支持的最大延時(shí)時(shí)間更短,則生成的文件更少)。
第二層在內(nèi)存中,當(dāng)消息的投遞時(shí)間即將到來(lái)的時(shí)候,會(huì)將這個(gè)小時(shí)的消息索引(索引包括消息在schedule log中的offset和size)從磁盤(pán)文件加載到內(nèi)存中的hash wheel上,內(nèi)存中的hash wheel則是以500ms為一個(gè)刻度。
總結(jié)一下設(shè)計(jì)上的亮點(diǎn):
- 時(shí)間輪算法適合延時(shí)/定時(shí)消息的場(chǎng)景,省去延時(shí)消息的排序,插入刪除操作都是 O(1) 的時(shí)間復(fù)雜度;
- 通過(guò)多級(jí)時(shí)間輪設(shè)計(jì),支持了超大時(shí)間跨度的延時(shí)消息;
- 通過(guò)延時(shí)加載,內(nèi)存中只會(huì)有最近要消費(fèi)的消息,更久的延時(shí)消息會(huì)被存儲(chǔ)在磁盤(pán)中,對(duì)內(nèi)存友好;
- 延時(shí)消息單獨(dú)存儲(chǔ)(schedule log),不會(huì)影響到正常消息的空間回收;
總結(jié)
本文匯總了目前業(yè)界常見(jiàn)的延時(shí)消息方案,并且討論了各個(gè)方案的優(yōu)缺點(diǎn)。希望對(duì)讀者有所啟發(fā)。