一文帶你了解 Redis 的發布與訂閱的底層原理
01、前言
發布訂閱系統在我們日常的工作中經常會使用到,這種場景大部分情況我們都是使用消息隊列的,常用的消息隊列有 Kafka,RocketMQ,RabbitMQ,每一種消息隊列都有其特性。這篇文章主要是給大家介紹 Redis 的發布訂閱系統,很多時候我們可能不需要獨立部署相應的消息隊列,只是簡單的使用,而且數據量也不會太大,這種情況下,我們就可以使用 Redis 的 Pub/Sub 模型。
02、使用方式
2.1 發布與訂閱
Redis 的發布訂閱功能主要由 PUBLISH,SUBSCRIBE,PSUBSCRIBE 命令組成,一個或者多個客戶端訂閱某個或者多個頻道,當其他客戶端向該頻道發送消息的時候,訂閱了該頻道的客戶端都會收到對應的消息。
上圖中有四個客戶端,Client 02,Client 03,Client 04 訂閱了同一個 Sport 頻道(Channel),這時當 Client 01 向 Sport Channel 發送消息 “basketball” 的時候,02-04 這三個客戶端都同時收到了這條消息。
整個過程的執行命令如下:
首先開四個 Redis 的客戶端,然后在 Client 02,Client 03,Client 04 中輸入subscribe sport 命令,表示訂閱 sport 這個頻道
然后在 Client 01 的客戶端中輸入publish sport basketball 表示向 sport 頻道發送消息 "basketball"
這個時候我們在去看下 Client 02-04 的客戶端,可以看到已經收到了消息了,每個訂閱了這個頻道的客戶端都是一樣的。
這里 Client 02-Client 04 三個客戶端訂閱了 Sport 頻道,我們叫做訂閱者(subscriber),Client 01 發布消息,我們叫做發布者(publisher),發送的消息就是 message。
2.2、模式訂閱
前面我們看到的是一個客戶端訂閱了一個 Channel,事實上單個客戶端也可以同時訂閱多個 Channel,采用模式匹配的方式,一個客戶端可以同時訂閱多個 Channel。
如上圖 Client 05 通過命令subscribe run 訂閱了 run 頻道,Client 06 通過命令psubscribe run* 訂閱了 run* 匹配的頻道。當 Client 07 向 run 頻道發送消息 666 的時候,05 和 06 兩個客戶端都收到消息了;接下來 Client 07 向 run1 和 run_sport 兩個頻道發送消息的時候,Client 06 依舊可以收到消息,而 Client 05 就收不到了消息了。
Client 05 訂閱run 頻道和接收到消息:
Client 06 訂閱run* 頻道和接收到消息:
Client 07 向多個頻道發送消息:
通過上面的案例,我們學會了一個客戶端可以訂閱單個或者多個頻道,分別通過subscribe,psubscribe 命令,客戶端可以通過 publish 發送相應的消息。
在命令行中我們可以用 Ctrl + C 來取消相關訂閱,對應的命令時 unsubscribe channelName。
03、Pub/Sub 底層存儲結構
3.1、訂閱 Channel
在 Redis 的底層結構中,客戶端和頻道的訂閱關系是通過一個字典加鏈表的結構保存的,形式如下:
在 Redis 的底層結構中,Redis 服務器結構體中定義了一個 pubsub_channels 字典
struct redisServer { //用于保存所有頻道的訂閱關系 dict *pubsub_channels;}
在這個字典中,key 代表的是頻道名稱,value 是一個鏈表,這個鏈表里面存放的是所有訂閱這個頻道的客戶端。
所以當有客戶端執行訂閱頻道的動作的時候,服務器就會將客戶端與被訂閱的頻道在 pubsub_channels 字典中進行關聯。
這個時候有兩種情況:
該渠道是首次被訂閱:首次被訂閱說明在字典中并不存在該渠道的信息,那么程序首先要創建一個對應的 key,并且要賦值一個空鏈表,然后將對應的客戶端加入到鏈表中。此時鏈表只有一個元素。
該渠道已經被其他客戶端訂閱過:這個時候就直接將對應的客戶端信息添加到鏈表的末尾就好了。
比如,如果有一個新的客戶端 Client 08 要訂閱 run 渠道,那么上圖就會變成
如果 Client 08 要訂閱一個新的渠道 new_sport ,那么就會變成
整個訂閱的過程可以采用下面偽代碼來實現
- Map<String, List<Object>> pubsub_channels = new HashMap<>();
- public void subscribe(String[] subscribeList, Object client) {
- //遍歷所有訂閱的 channel,檢查是否在 pubsub_channels 中,不在則創建新的 key 和空鏈表
- for (int i = 0; i < subscribeList.length; i++) {
- if (!pubsub_channels.containsKey(subscribeList[i])) {
- pubsub_channels.put(subscribeList[i], new ArrayList<>());
- }
- pubsub_channels.get(subscribeList[i]).add(client);
- }
- }
3.2 取消訂閱
上面介紹的是單個 Channel 的訂閱,相反的如果一個客戶端要取消訂閱相關 Channel,則無非是找到對應的 Channel 的鏈表,從中刪除對應的客戶端,如果該客戶端已經是最后一個了,則將對應 Channel 也刪除。
- public void unSubscribe(String[] subscribeList, Object client) {
- //遍歷所有訂閱的 channel,依次刪除
- for (int i = 0; i < subscribeList.length; i++) {
- pubsub_channels.get(subscribeList[i]).remove(client);
- //如果長度為 0 則清楚 channel
- if (pubsub_channels.get(subscribeList[i]).size() == 0) {
- remove(subscribeList[i]);
- }
- }
- }
04、模式訂閱結構
模式渠道的訂閱與單個渠道的訂閱類似,不過服務器是將所有模式的訂閱關系都保存在服務器狀態的pubsub_patterns 屬性里面。
- struct redisServer{
- //保存所有模式訂閱關系
- list *pubsub_patterns;
- }
與訂閱單個 Channel 不同的是,pubsub_patterns 屬性是一個鏈表,不是字典。節點的結構如下:
- struct pubsubPattern{
- //訂閱模式的客戶端
- redisClient *client;
- //被訂閱的模式
- robj *pattern;
- } pubsubPattern;
其實 client 屬性是用來存放對應客戶端信息,pattern 是用來存放客戶端對應的匹配模式。
所以對應上面的 Client-06 模式匹配的結構存儲如下
在pubsub_patterns鏈表中有一個節點,對應的客戶端是 Client-06,對應的匹配模式是run*。
4.1、訂閱模式
當某個客戶端通過命令psubscribe 訂閱對應模式的 Channel 時候,服務器會創建一個節點,并將 Client 屬性設置為對應的客戶端,pattern 屬性設置成對應的模式規則,然后添加到鏈表尾部。
對應的偽代碼如下:
- List<PubSubPattern> pubsub_patterns = new ArrayList<>();
- public void psubscribe(String[] subscribeList, Object client) {
- //遍歷所有訂閱的 channel,創建節點
- for (int i = 0; i < subscribeList.length; i++) {
- PubSubPattern pubSubPattern = new PubSubPattern();
- pubSubPattern.client = client;
- pubSubPattern.pattern = subscribeList[i];
- pubsub_patterns.add(pubSubPattern);
- }
- }
- 創建新節點;
- 給節點的屬性賦值;
- 將節點添加到鏈表的尾部;
4.2、退訂模式
退訂模式的命令是punsubscribe,客戶端使用這個命令來退訂一個或者多個模式 Channel。服務器接收到該命令后,會遍歷pubsub_patterns鏈表,將匹配到的 client 和 pattern 屬性的節點給刪掉。這里需要判斷 client 屬性和 pattern 屬性都合法的時候再進行刪除。
偽代碼如下:
- public void punsubscribe(String[] subscribeList, Object client) {
- //遍歷所有訂閱的 channel 相同 client 和 pattern 屬性的節點會刪除
- for (int i = 0; i < subscribeList.length; i++) {
- for (int j = 0; j < pubsub_patterns.size(); j++) {
- if (pubsub_patterns.get(j).client == client
- && pubsub_patterns.get(j).pattern == subscribeList[i]) {
- remove(pubsub_patterns);
- }
- }
- }
- }
遍歷所有的節點,當匹配到相同 client 屬性和 pattern 屬性的時候就進行節點刪除。
05、發布消息
發布消息比較好容易理解,當一個客戶端執行了publish channelName message 命令的時候,服務器會從pubsub_channels和pubsub_patterns 兩個結構中找到符合channelName 的所有 Channel,進行消息的發送。在 pubsub_channels 中只要找到對應的 Channel 的 key 然后向對應的 value 鏈表中的客戶端發送消息就好。
06、總結
這篇文章主要給大家介紹了一下 Redis 的發布/訂閱的使用方式和底層的存儲結構以及部分偽代碼的實現,希望對大家有幫助。