輕量級消息發布訂閱:Redis的適用場景
作者:一安
在實際項目中,某些業務場景需要使用消息的發布訂閱功能來實現特殊需求。雖然常見的消息中間件如 RabbitMQ、Kafka? 和 ActiveMQ? 等提供了強大的消息處理能力,但它們通常被認為是較為“重量級”的解決方案,使用成本較高。
前言
在實際項目中,某些業務場景需要使用消息的發布訂閱功能來實現特殊需求。雖然常見的消息中間件如 RabbitMQ、Kafka 和 ActiveMQ 等提供了強大的消息處理能力,但它們通常被認為是較為“重量級”的解決方案,使用成本較高。在一些業務場景中,我們只需要實現消息的發布訂閱功能,并不需要保證消息的絕對可靠性和高性能要求。此時,使用 Redis 作為消息中間件無疑是更好的選擇。
項目如何搭建略過,可以使用 Spring Initializr 或者其他 IDEA 創建一個新的 Spring Boot 項目,并添加相關依賴即可
創建 Redis 消息發布者
創建一個服務類用于發布消息:
@Service
public class RedisPublisherService {
@Autowired
private RedisTemplate redisTemplate;
public void publishMessage(String channel, String message) {
redisTemplate.convertAndSend(channel, message);
}
}
創建 Redis 消息訂閱者
創建一個服務類用于監聽消息:
/**
* Redis 消息訂閱-消息監聽器,當收到閱訂的消息時,會將消息交給這個類處理
* 可以直接實現 MessageListener 接口,也可以繼承它的實現類 MessageListenerAdapter
* 自動多線程處理
*/
@Service
public class RedisSubscriberService implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = message.getChannel().toString();
String data = new String(message.getBody());
System.out.println("Received message from channel " + channel + ": " + data);
}
}
消息監聽器綁定監聽指定通道
/**
* 自定義 RedisTemplate 序列化方式
* 配置主題訂閱 - Redis消息監聽器綁定監聽指定通道
*/
@Configuration
public class RedisConfig {
// 自定義的消息訂閱監聽器
@Resource
private RedisSubscriberService redisSubscriberService;
/**
* 自定義 RedisTemplate 序列化方式
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
//綁定 RedisConnectionFactory
redisTemplate.setConnectionFactory(redisConnectionFactory);
//創建 Jackson2JsonRedisSerializer 序列方式,對象類型使用 Object 類型,
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(new LaissezFaireSubTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 設置 RedisTemplate 序列化規則,因為 key 通常是普通的字符串,所以使用StringRedisSerializer即可,而 value 是對象時,才需要使用序列化與反序列化。
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// hash key 序列化規則
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
//屬性設置后操作
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* 配置主題訂閱
* 可以添加多個監聽器,監聽多個通道,只需要將消息監聽器與訂閱的通道/主題綁定即可。
* addMessageListener(MessageListener listener, Collection<? extends Topic> topics):將消息監聽器與多個訂閱的通道/主題綁定
* addMessageListener(MessageListener listener, Topic topic):將消息監聽器與訂閱的通道/主題綁定
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 設置連接工廠,RedisConnectionFactory 可以直接從容器中取,也可以從 RedisTemplate 中取
container.setConnectionFactory(connectionFactory);
// 訂閱名稱叫test-channel的通道, 類似 Redis 中的subscribe命令
container.addMessageListener(redisSubscriberService, new ChannelTopic("test-channel"));
// 訂閱名稱以 'user-' 開頭的全部通道, 類似 Redis 的 pSubscribe 命令
container.addMessageListener(redisSubscriberService, new PatternTopic("user-*"));
return container;
}
}
測試發布與訂閱
創建一個測試類來測試發布和訂閱功能:
@Component
public class RedisTestRunner implements CommandLineRunner {
@Autowired
private RedisPublisherService publisherService;
@Autowired
private RedisSubscriberService subscriberService;
@Override
public void run(String... args) throws Exception {
// 發布消息
publisherService.publishMessage("test-channel", "Hello, yian!");
publisherService.publishMessage("user-channel", "Hello, weilai!");
}
}
責任編輯:武曉燕
來源:
一安未來