成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spring Boot + Redis Streams :構建高效消息系統

數據庫 Redis
Redis Streams是一種日志數據結構,類似于Apache Kafka中的分區日志,提供了持久化、可回溯、消息分組等特性。它支持生產者消費者模型,允許生產者將消息追加到流的末尾,消費者從流中讀取消息進行處理。

前言

在現代微服務架構中,可靠的消息處理系統是保證系統高可用性和擴展性的關鍵。Redis Streams作為Redis 5.0引入的強大功能,提供了一種日志數據結構,能夠高效地處理消息隊列和流數據。

簡介

圖片圖片

Redis Streams是一種日志數據結構,類似于Apache Kafka中的分區日志,提供了持久化、可回溯、消息分組等特性。它支持生產者消費者模型,允許生產者將消息追加到流的末尾,消費者從流中讀取消息進行處理。

Redis Streams的主要特性包括:

  • 消息持久化:消息存儲在Redis內存中,并可通過持久化策略(如 RDB、AOF)保證數據不丟失。
  • 消息分組:支持將消費者劃分為不同的分組,每個分組可以獨立消費消息,實現消息的并行處理。
  • 消息確認機制:消費者處理完消息后,可以向流發送確認消息,確保消息不會被重復處理。
  • 消息回溯:可以從任意位置讀取消息,支持歷史消息的查詢和重放。

效果圖

圖片圖片

消息生產與消費實踐

創建消息實體類

@Data
public class Message implements Serializable {
    private String id;
    private String content;
}

生產者服務

@Service
public class MessageProducer {
    private static final String STREAM_KEY = "message-stream";
    private final RedisTemplate<String, Object> redisTemplate;

    public MessageProducer(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public RecordId sendMessage(Message message) {
        StreamOperations<String, Object, Object> streamOps = redisTemplate.opsForStream();
        Map<String, Object> messageMap = new HashMap<>();
        messageMap.put("id", message.getId());
        messageMap.put("content", message.getContent());

        return streamOps.add(MapRecord.create(STREAM_KEY, messageMap));
    }
}

配置消費者(組)

@Slf4j
@Service
public class MessageConsumer implements StreamListener<String, MapRecord<String, String, String>> {
    private static final String STREAM_KEY = "message-stream";
    private static final String GROUP_NAME = "message-group";
    private static final String CONSUMER_NAME = "consumer-1";
    private final RedisTemplate<String, Object> redisTemplate;
    private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;

    public MessageConsumer(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @PostConstruct
    public void init() {
        String script = "if redis.call('EXISTS', KEYS[1]) == 0 then " +
                "  return 1 " +
                "else " +
                "  return 0 " +
                "end";
        RedisScript<Long> redisScript = RedisScript.of(script, Long.class);
        Long result = redisTemplate.execute(redisScript, Collections.singletonList(streamKey));

        if (result != null && result == 1) {
            redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.latest(), groupName);
            log.info("消費者組 {} 創建成功", groupName);
        } else {
            log.info("消費者組 {} 已存在", groupName);
        }

        // 配置消息監聽容器
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,
                MapRecord<String, String, String>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        .batchSize(10)
                        .pollTimeout(Duration.ofMillis(100))
                        .build();

        container = StreamMessageListenerContainer.create(redisTemplate.getConnectionFactory(), options);

        /**
         *  ReadOffset.latest():指定組的起始位置為 “當前最新消息”
         *  ReadOffset.lastConsumed():從消費者組的最后確認位置開始讀取。如果是新組,默認從$(最新位置)開始,確保消息至少被消費一次(At Least Once)
         *  ReadOffset.from(String id):從指定的消息 ID 開始讀取
         *   id="0-0":從流的起始位置(第一條消息)開始讀取所有歷史消息
         *   id="$":等價于ReadOffset.latest(),從尾部開始讀取新消息
         *   id="具體消息ID":從指定 ID 的下一條消息開始讀取
        **/
        container.receive(
                Consumer.from(GROUP_NAME, CONSUMER_NAME),
                StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),this);

        container.start();
        // 驗證容器是否啟動成功
        if (container.isRunning()) {
            log.info("消息監聽容器已啟動");
        } else {
            log.warn("消息監聽容器啟動失敗");
        }
    }

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        try {
            Map<String, String> value = message.getValue();
            String id = value.get("id");
            String content = value.get("content");

            // 處理消息
            System.out.println("收到消息: ID=" + id + ", 內容=" + content);

            // 業務處理邏輯...

            // 確認消息處理完成
            redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, message.getId());
        } catch (Exception e) {
            // 處理異常,可以記錄日志或實現重試邏輯
            System.err.println("消息處理失敗: " + e.getMessage());
        }
    }

    @PreDestroy
    public void destroy() {
        if (container != null) {
            container.stop();
        }
    }

    public void consumeMessages() {
        StreamOperations<String, String, String> streamOps = redisTemplate.opsForStream();
        List<MapRecord<String, String, String>> messages = streamOps.read(
                Consumer.from(GROUP_NAME, CONSUMER_NAME),
                StreamReadOptions.empty().count(10).block(Duration.ofSeconds(10)),
                StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())
        );

        messages.forEach(message -> {
            Map<String, String> value = message.getValue();
            String id = value.get("id");
            String content = value.get("content");
            System.out.println("Received message: id=" + id + ", cnotallow=" + content);
            // 確認消息已處理
            streamOps.acknowledge(STREAM_KEY, GROUP_NAME, message.getId());
        });
    }
}

檢查流和消費者組狀態

# 查看流信息
XLEN message-stream

# 查看消費者組信息
XINFO GROUPS message-stream

# 查看組消費情況
XINFO CONSUMERS message-stream message-group

注意

  • 在創建Redis Streams消費者組時,不能使用ReadOffset.lastConsumed(),當你創建一個新的消費者組時,Redis要求你明確指定組的初始讀取位置(即從哪個消息ID開始消費)

組的狀態尚未初始化:新組沒有任何消費記錄,lastConsumed()無法確定起始位置

Redis API設計:創建組的命令(XGROUP CREATE)必須包含一個固定的偏移量參數(如0-0$

Redis Streams 和 Redis Pub-Sub 之間的主要區別

特性

Redis Streams

Redis Pub-Sub

消息持久性

支持

不支持

投遞保證

即使消費者離線也能投遞

無(未被消費的消息會丟失)

重放能力

支持(可通過 ID 讀取歷史消息)

不支持(僅支持實時消息)

消息有序性

有保證(基于消息 ID)

無保證

消費者協調

支持(通過消費者組實現)

不支持

多消費者支持

支持(通過消費者組實現并發消費)

支持(消息廣播至所有訂閱者)


責任編輯:武曉燕 來源: 一安未來
相關推薦

2025-06-05 08:00:00

Go事件驅動系統編程

2025-05-16 07:24:41

Springkafka腳手架

2023-11-07 10:01:34

2020-01-14 15:08:44

Redis5Streams數據庫

2024-10-25 08:41:18

消息隊列RedisList

2023-07-10 08:26:19

2023-09-01 08:46:44

2025-03-31 08:39:55

2018-12-05 09:00:00

RedisRedis Strea數據庫

2023-10-11 14:37:21

工具開發

2021-09-15 09:02:20

Spring 6Spring BootJava

2021-09-03 06:46:34

Spring 6pring Boot 項目

2021-01-12 08:43:29

Redis ListStreams

2019-01-15 11:40:14

開發技能代碼

2025-05-29 01:33:00

微服務架構系統

2025-05-13 07:13:25

2022-10-10 08:00:00

微服務Spring Boo容器

2025-02-05 12:28:44

2024-03-26 08:08:08

SpringBPMN模型

2025-01-08 10:35:26

代碼開發者Spring
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产免费va | 欧美一级视频免费看 | 天天干天天爱天天操 | 少妇特黄a一区二区三区88av | 成人国产免费视频 | 精品国产精品国产偷麻豆 | 99久久免费精品 | 国产一区二区三区久久久久久久久 | 草草草草视频 | 亚洲一区二区三区在线视频 | 99精品一级欧美片免费播放 | 91精品国产综合久久久久久 | 日韩中文字幕区 | 精品中文在线 | 久久99国产精品 | 深夜福利亚洲 | 国产精品久久欧美久久一区 | 欧美日韩专区 | 国产一级一级毛片 | 亚洲国产aⅴ成人精品无吗 综合国产在线 | 好姑娘影视在线观看高清 | 欧美黄色录像 | 在线免费观看黄色网址 | 中文字幕 在线观看 | 超碰在线播 | 天天干干| 狠狠狠色丁香婷婷综合久久五月 | 日韩欧美理论片 | 日批av| 欧美久久一区二区三区 | 欧美一区二区小视频 | 欧美日韩高清免费 | 国产精品国产三级国产aⅴ中文 | 欧美美女二区 | 欧美一二三 | 一区二区三区在线观看视频 | 五月天综合影院 | 97视频免费 | 中文日韩在线视频 | 国产99视频精品免费播放照片 | 久久伊人亚洲 |