解鎖 Redis 發布訂閱模式:通過實踐演示挖掘消息通信潛能
Redis 的發布訂閱模式,正是這樣一把強大的“通信鑰匙”,它為開發者們開啟了一扇實現高效消息交互的大門。通過發布訂閱模式,一個消息生產者可以將消息發布到指定的頻道中,而多個對該頻道感興趣的消息消費者能夠同時接收到這些消息,實現了消息的一對多廣播。
在本文中,我們將通過詳細的實踐演示,深入探索 Redis 發布訂閱模式的奧秘。從基礎概念的簡要回顧,到搭建環境、編寫代碼示例,再到對運行結果的詳細分析,一步步帶你領略這一強大模式在實際項目中的應用魅力,讓你在面對類似的消息通信需求時能夠游刃有余。
一、詳解redis發布訂閱模式
1. 什么是發布與訂閱
redis發布訂閱是一種解耦生產者和消費者一種消息通信模式,訂閱者通過訂閱channel等待最新的消息,發布者按需將消息發送到指定channel上供訂閱者消費:
2. redis發布訂閱模式的使用
3. redis發布訂閱的兩種模式
redis發布訂閱有兩種方式:
- 基于精確頻道的訂閱模式,即訂閱者訂閱名為channel-1的頻道,那么只有channel-1有消息時才會通知這些訂閱。
- 基于匹配模型的訂閱模式,即訂閱者可以通過表達式訂閱頻道,例如訂閱者訂閱了channel*的頻道,那么所有前綴為channel的頻道都會向這個訂閱者發布消息。
4. 基于頻道的訂閱模式
基于頻道訂閱模式的指令格式如下,可以看到訂閱者可以訂閱多個頻道
subscribe channel [channel ...]
所以我們開啟一個redis客戶端,訂閱一個channel:sport的頻道,對應的指令和輸出結果如下,可以看到發送指令后redis服務端返回1,通知訂閱者成功訂閱該頻道:
# 客戶端1 訂閱 channel:sport
127.0.0.1:6379> SUBSCRIBE channel:sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:sport"
3) (integer) 1
對應的我們也給出發布消息的指令格式:
publish channel message
此時,我們再開啟一個redis客戶端,發布一條消息到channel:sport的頻道:
# 另一個客戶端發送消息
127.0.0.1:6379> PUBLISH channel:sport "this is why we play"
(integer) 1
127.0.0.1:6379>
此時剛剛訂閱消息的redis客戶端就會實時的收到這條消息:
5. 基于匹配模式的發布與訂閱示例
有時候我們會訂閱多個頻道,我們不可能每次都去手動增加訂閱的頻道,例如我們當前訂閱頻道有:c1、c2、c3、c4、c5、c6、c7、c8。將來還可能出現c9等情況。我們不可能實時去添加訂閱的頻道。 觀察上面的頻道我們發現頻道都是以c開頭,后續的數字不斷變化,所以我們完全可以使用模式匹配來實現頻道訂閱。
模式訂閱和取消的命令為:
psubscribe pattern [pattern...]
punsubscribe [pattern [pattern ...]]
關于parttern常見的匹配符有:
- :表示任意占位符,例如c,可以匹配c、c1、c111
- ?*:匹配一個及以上個占位符
- ?:表示匹配一個占位符
我們希望訂閱c1-c9的頻道基于模式匹配我們就能夠做到這一點。
我們首先開啟一個客戶端,使用模式匹配發起訂閱:
# 訂閱匹配cxx相關的模式
PSUBSCRIBE c?*
然后我們在開啟另一個客戶端,發送消息到c1頻道:
127.0.0.1:6379> PUBLISH c1 "this is c1 message"
(integer) 1
127.0.0.1:6379>
剛剛訂閱的客戶端就會收到消息:
127.0.0.1:6379> PSUBSCRIBE c?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?*"
3) (integer) 1
1) "pmessage"
2) "c?*"
3) "c1"
4) "this is c1 message"
注意:當你訂閱PSUBSCRIBE c?* c1訂閱時,若另一個客戶端發送消息到c1你會收到兩條消息(原因會在后文源碼解析時補充)。
如下便是PSUBSCRIBE c?* c1的收到PUBLISH c1 "this is c1 message"的消息內容:
# 訂閱c?* 和精確的c1兩個頻道
127.0.0.1:6379> PSUBSCRIBE c?* c1
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?*"
3) (integer) 1
1) "psubscribe"
2) "c1"
3) (integer) 2
1) "pmessage"
2) "c1"
3) "c1"
4) "this is c1 message"
1) "pmessage"
2) "c?*"
3) "c1"
4) "this is c1 message"
6. 基于spring boot集成redis落地發布與訂閱模式
我們將使用一個用戶訂閱channel:sport,一個用戶訂閱channel:stock,而另一個用戶則通過channel*訂閱兩個都訂閱:
了解了大體思路,我們就開始落地這個需求,首先第一步還是引入redis的依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
然后在配置文件中給出redis文件的配置信息:
spring.redis.host=127.0.0.1
spring.redis.port=6379
隨后就是定義3個訂閱者的處理器的通用接口定義,后續我們將通過函數時編程來分別落地3個訂閱者的處理器:
public interface RedisMsgHandler {
/**
* 處理發布者發送的消息
* @param message
*/
void handleMessage(String message);
}
然后我們就可以針對redis不同的訂閱者進行配置進行聲明:
@Configuration
@EnableCaching
@Slf4j
public class RedisConfig {
/**
* Redis消息監聽器容器
*
* @param connectionFactory
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//訂閱體育消息
container.addMessageListener(listenerAdapter(msg -> log.info("收到體育新聞,消息內容:{}", msg)), new PatternTopic("channel:sport"));
//訂閱股票消息
container.addMessageListener(listenerAdapter(msg -> log.info("收到庫存消息,消息內容:{}", msg)), new PatternTopic("channel:stock"));
//channel前綴的消息都訂閱
container.addMessageListener(listenerAdapter(msg -> log.info("收到channel前綴的消息:{}", msg)), new PatternTopic("channel:*"));
return container;
}
/**
* 配置消息接收處理類
*
* @return
*/
@Bean
@Scope("prototype")
MessageListenerAdapter listenerAdapter(RedisMsgHandler handler) {
//這個地方 是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調用“receiveMessage”
//也有好幾個重載方法,這邊默認調用處理器的方法 叫 handleMessage 可以自己到源碼里面看
return new MessageListenerAdapter(handler, "handleMessage");//注意2個通道調用的方法都要為 receiveMessage
}
}
最后我們啟動一個定時器定時向頻道發送消息:
@EnableScheduling
@Component
public class SenderTask {
@Autowired
private StringRedisTemplate stringRedisTemplate;
//向redis消息隊列index通道發布消息
@Scheduled(fixedRate = 2000)
public void sendMessage() {
stringRedisTemplate.convertAndSend("channel:stock", "股票漲了"+Math.random()+"個百分點");
stringRedisTemplate.convertAndSend("channel:sport", "籃球新聞 No."+((int)(Math.random()*100)+1)+"選手得分");
}
}
最后運行結果如下所示:
2025-01-11 22:39:03.129 INFO 16952 --- [ container-3] com.sharkChili.RedisConfig : 收到channel前綴的消息:股票漲了0.5562952653668436個百分點
2025-01-11 22:39:03.129 INFO 16952 --- [ container-4] com.sharkChili.RedisConfig : 收到體育新聞,消息內容:籃球新聞 No.20選手得分
2025-01-11 22:39:03.129 INFO 16952 --- [ container-5] com.sharkChili.RedisConfig : 收到channel前綴的消息:籃球新聞 No.20選手得分
2025-01-11 22:39:05.121 INFO 16952 --- [ container-6] com.sharkChili.RedisConfig : 收到庫存消息,消息內容:股票漲了0.47411355078897777個百分點
2025-01-11 22:39:05.122 INFO 16952 --- [ container-7] com.sharkChili.RedisConfig : 收到channel前綴的消息:股票漲了0.47411355078897777個百分點
2025-01-11 22:39:05.135 INFO 16952 --- [ container-9] com.sharkChili.RedisConfig : 收到channel前綴的消息:籃球新聞 No.15選手得分
2025-01-11 22:39:05.135 INFO 16952 --- [ container-8] com.sharkChili.RedisConfig : 收到體育新聞,消息內容:籃球新聞 No.15選手得分
2025-01-11 22:39:07.135 INFO 16952 --- [ container-11] com.sharkChili.RedisConfig : 收到channel前綴的消息:股票漲了0.886860372468581個百分點
2025-01-11 22:39:07.135 INFO 16952 --- [ container-10] com.sharkChili.RedisConfig : 收到庫存消息,消息內容:股票漲了0.886860372468581個百分點
2025-01-11 22:39:07.136 INFO 16952 --- [ container-12] com.sharkChili.RedisConfig : 收到體育新聞,消息內容:籃球新聞 No.89選手得分
2025-01-11 22:39:07.136 INFO 16952 --- [ container-13] com.sharkChili.RedisConfig : 收到channel前綴的消息:籃球新聞 No.89選手得分
7. 發布訂閱的常見的使用場景和優缺點
使用場景如:聊天室、公告牌、異步處理電商訂單非核心操作等需要實現消息解耦的場景都可以使用消息訂閱發布,如下所示,這就是電商下單業務對于redis發布訂閱模式的使用模型圖:
當然redis發布訂閱模式優缺點也很明顯,我們先來說說優點,redis發布訂閱模式 實現簡單,對于簡單的解耦生產者和消費者關系的應用場景綽綽有余,而缺點也是一樣,因為redis發布訂閱模式實現比較簡單,并沒有持久化機制無法保證可靠消費和故障恢復,同時,相較于Kafka、RocketMQ,Redis的實現略顯粗糙,無法實現消息堆積與回溯。
二、redis如何實現發布訂閱模型
關于redis發布訂閱模型,對于底層實現感興趣的讀者,可以參考筆者下面這篇關于pub/sub的源碼解析:《聊聊 Redis 的發布訂閱設計與實現》
三、小結
本文從redis發布訂閱模型基本介紹和幾個實踐案例角度帶讀者快速入門了一下redis發布訂閱模型的使用場景以及一些注意事項,希望對你有幫助。