Spring Boot + Redis Streams :構建高效消息系統
前言
在現代微服務架構中,可靠的消息處理系統是保證系統高可用性和擴展性的關鍵。Redis Streams
作為Redis 5.0
引入的強大功能,提供了一種日志數據結構,能夠高效地處理消息隊列和流數據。
簡介
圖片
Redis Streams
是一種日志數據結構,類似于Apache Kafka
中的分區日志,提供了持久化、可回溯、消息分組等特性。它支持生產者消費者模型,允許生產者將消息追加到流的末尾,消費者從流中讀取消息進行處理。
Redis Streams
的主要特性包括:
- 消息持久化:消息存儲在
Redis
內存中,并可通過持久化策略(如RDB、AOF
)保證數據不丟失。 - 消息分組:支持將消費者劃分為不同的分組,每個分組可以獨立消費消息,實現消息的并行處理。
- 消息確認機制:消費者處理完消息后,可以向流發送確認消息,確保消息不會被重復處理。
- 消息回溯:可以從任意位置讀取消息,支持歷史消息的查詢和重放。
效果圖
圖片
消息生產與消費實踐
創建消息實體類
@Data
public class Message implements Serializable {
private String id;
private String content;
}
生產者服務
@Service
public class MessageProducer {
private static final String STREAM_KEY = "message-stream";
private final RedisTemplate<String, Object> redisTemplate;
public MessageProducer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
public RecordId sendMessage(Message message) {
StreamOperations<String, Object, Object> streamOps = redisTemplate.opsForStream();
Map<String, Object> messageMap = new HashMap<>();
messageMap.put("id", message.getId());
messageMap.put("content", message.getContent());
return streamOps.add(MapRecord.create(STREAM_KEY, messageMap));
}
}
配置消費者(組)
@Slf4j
@Service
public class MessageConsumer implements StreamListener<String, MapRecord<String, String, String>> {
private static final String STREAM_KEY = "message-stream";
private static final String GROUP_NAME = "message-group";
private static final String CONSUMER_NAME = "consumer-1";
private final RedisTemplate<String, Object> redisTemplate;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
public MessageConsumer(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@PostConstruct
public void init() {
String script = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
" return 1 " +
"else " +
" return 0 " +
"end";
RedisScript<Long> redisScript = RedisScript.of(script, Long.class);
Long result = redisTemplate.execute(redisScript, Collections.singletonList(streamKey));
if (result != null && result == 1) {
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.latest(), groupName);
log.info("消費者組 {} 創建成功", groupName);
} else {
log.info("消費者組 {} 已存在", groupName);
}
// 配置消息監聽容器
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,
MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10)
.pollTimeout(Duration.ofMillis(100))
.build();
container = StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(), options);
/**
* ReadOffset.latest():指定組的起始位置為 “當前最新消息”
* ReadOffset.lastConsumed():從消費者組的最后確認位置開始讀取。如果是新組,默認從$(最新位置)開始,確保消息至少被消費一次(At Least Once)
* ReadOffset.from(String id):從指定的消息 ID 開始讀取
* id="0-0":從流的起始位置(第一條消息)開始讀取所有歷史消息
* id="$":等價于ReadOffset.latest(),從尾部開始讀取新消息
* id="具體消息ID":從指定 ID 的下一條消息開始讀取
**/
container.receive(
Consumer.from(GROUP_NAME, CONSUMER_NAME),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),this);
container.start();
// 驗證容器是否啟動成功
if (container.isRunning()) {
log.info("消息監聽容器已啟動");
} else {
log.warn("消息監聽容器啟動失敗");
}
}
@Override
public void onMessage(MapRecord<String, String, String> message) {
try {
Map<String, String> value = message.getValue();
String id = value.get("id");
String content = value.get("content");
// 處理消息
System.out.println("收到消息: ID=" + id + ", 內容=" + content);
// 業務處理邏輯...
// 確認消息處理完成
redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, message.getId());
} catch (Exception e) {
// 處理異常,可以記錄日志或實現重試邏輯
System.err.println("消息處理失敗: " + e.getMessage());
}
}
@PreDestroy
public void destroy() {
if (container != null) {
container.stop();
}
}
public void consumeMessages() {
StreamOperations<String, String, String> streamOps = redisTemplate.opsForStream();
List<MapRecord<String, String, String>> messages = streamOps.read(
Consumer.from(GROUP_NAME, CONSUMER_NAME),
StreamReadOptions.empty().count(10).block(Duration.ofSeconds(10)),
StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())
);
messages.forEach(message -> {
Map<String, String> value = message.getValue();
String id = value.get("id");
String content = value.get("content");
System.out.println("Received message: id=" + id + ", cnotallow=" + content);
// 確認消息已處理
streamOps.acknowledge(STREAM_KEY, GROUP_NAME, message.getId());
});
}
}
檢查流和消費者組狀態
# 查看流信息
XLEN message-stream
# 查看消費者組信息
XINFO GROUPS message-stream
# 查看組消費情況
XINFO CONSUMERS message-stream message-group
注意:
- 在創建
Redis Streams
消費者組時,不能使用ReadOffset.lastConsumed()
,當你創建一個新的消費者組時,Redis
要求你明確指定組的初始讀取位置(即從哪個消息ID
開始消費)組的狀態尚未初始化:新組沒有任何消費記錄,
lastConsumed()
無法確定起始位置
Redis API
設計:創建組的命令(XGROUP CREATE
)必須包含一個固定的偏移量參數(如0-0
或$
)
Redis Streams 和 Redis Pub-Sub 之間的主要區別
特性 | Redis Streams | Redis Pub-Sub |
消息持久性 | 支持 | 不支持 |
投遞保證 | 即使消費者離線也能投遞 | 無(未被消費的消息會丟失) |
重放能力 | 支持(可通過 ID 讀取歷史消息) | 不支持(僅支持實時消息) |
消息有序性 | 有保證(基于消息 ID) | 無保證 |
消費者協調 | 支持(通過消費者組實現) | 不支持 |
多消費者支持 | 支持(通過消費者組實現并發消費) | 支持(消息廣播至所有訂閱者) |