成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Redis Stream 用做消息隊列完美嗎

數據庫 其他數據庫
Redis 本身定位是內存數據庫,它的設計之初都是為緩存準備的,并不具備消息堆積的能力。而專業消息隊列一個非常重要的功能是數據中轉樞紐,Redis 的定位很難滿足,使用起來要非常小心。
Redis Stream 是 Redis 5.0 版本中引入的一種新的數據結構,用于實現消息傳遞的功能。

這篇文章,分享筆者學習 Redis Stream 的心得,希望對大家有所啟發。

圖片圖片

1 基礎知識

Redis Stream 的結構如下圖所示,它是一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的 ID 和對應的內容。

圖片圖片

每個 Redis Stream 都有唯一的名稱 ,對應唯一的 Redis Key 。

同一個 Stream 可以掛載多個消費者組 ConsumerGroup  , 消費組不能自動創建,需要使用 XGROUP CREATE 命令創建。

每個消費組會有個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動 ,標識當前消費組消費到哪條消息了。

消費組 ConsumerGroup 同樣可以掛載多個消費者 Consumer ,  每個 Consumer 并行的讀取消息,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。

消費者內部有一個屬性 pending_ids , 記錄了當前消費者讀取但沒有回復 ACK 的消息 ID 列表 。

細品 Redis stream 的設計,我們發現它和 Kafka 非常相似,比如說消費者組,消費進度偏移量等。

但內部實現差異還是很明顯,Kafka 主要是利用了文件系統的特性,每一個主題下存在多個分區 partition,而消費者處理對應分區消息成功之后,消費者會 commit 消費進度到 Broker ,Broker 修改消費進度偏移量并持久化。

2 核心命令

01 XADD 向 Stream 末尾添加消息

使用 XADD 向隊列添加消息,如果指定的隊列不存在,則創建一個隊列。基礎語法格式:

XADD key ID field value [field value ...]
  • key :隊列名稱,如果不存在就創建
  • ID :消息 id,我們使用 * 表示由 redis 生成,可以自定義,但是要自己保證遞增性。
  • field value :記錄。
127.0.0.1:6379> XADD mystream * name1 value1 name2 value2
"1712473185388-0"
127.0.0.1:6379> XLEN mystream
(integer) 1
127.0.0.1:6379> XADD mystream * name2 value2 name3 value3
"1712473231761-0"

消息 ID 使用 * 表示由 redis 生成,同時也可以自定義,但是自定義時要保證遞增性。

消息 ID 的格式:毫秒級時間戳 + 序號 , 例如:1712473185388-5 , 它表示當前消息在毫秒時間戳 1712473185388 產生 ,并且該毫秒內產生到了第5條消息。

在添加隊列消息時,也可以指定隊列的長度。

127.0.0.1:6379> XADD mystream MAXLEN 100 * name value1 age 30
"1713082205042-0"

使用  XADD 命令向 mystream 的 stream 中添加了一條消息,并且指定了最大長度為 100。消息的 ID 由 Redis 自動生成,消息包含兩個字段 name 和 age,分別對應的值是 value1 和 30。

02 XRANGE 獲取消息列表

使用 XRANGE 獲取消息列表,會自動過濾已經刪除的消息。語法格式:

XRANGE key start end [COUNT count]
  • key :隊列名
  • start :開始值, - 表示最小值
  • end :結束值, + 表示最大值
  • count :數量
127.0.0.1:6379> XRANGE mystream - + COUNT 2
1) 1) "1712473185388-0"
   2) 1) "name1"
      2) "value1"
      3) "name2"
      4) "value2"
2) 1) "1712473231761-0"
   2) 1) "name2"
      2) "value2"
      3) "name3"
      4) "value3"

我們得到兩條消息,第一層是消息 ID ,第二層是消息內容 ,消息內容是 Hash 數據結構 。

03 XREAD 以阻塞/非阻塞方式獲取消息列表

使用 XREAD 以阻塞或非阻塞方式獲取消息列表 ,語法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :數量
  • milliseconds :可選,阻塞毫秒數,沒有設置就是非阻塞模式
  • key :隊列名
  • id :消息 ID
127.0.0.1:6379> XREAD streams mystream 0-0
1) 1) "mystream"
   2) 1) 1) "1712473185388-0"
         2) 1) "name1"
            2) "value1"
            3) "name2"
            4) "value2"
      2) 1) "1712473231761-0"
         2) 1) "name2"
            2) "value2"
            3) "name3"
            4) "value3"

XRED 讀消息時分為阻塞和非阻塞模式,使用 BLOCK 選項可以表示阻塞模式,需要設置阻塞時長。非阻塞模式下,讀取完畢(即使沒有任何消息)立即返回,而在阻塞模式下,若讀取不到內容,則阻塞等待。

127.0.0.1:6379> XREAD block 1000 streams mystream $
(nil)
(1.07s)

使用 Block 模式,配合 $ 作為 ID ,表示讀取最新的消息,若沒有消息,命令阻塞!等待過程中,其他客戶端向隊列追加消息,則會立即讀取到。

因此,典型的隊列就是 XADD 配合 XREAD Block 完成。XADD 負責生成消息,XREAD 負責消費消息。

04 XGROUP CREATE 創建消費者組

使用 XGROUP CREATE 創建消費者組,分兩種情況:

  • 從頭開始消費:
XGROUP CREATE mystream consumer-group-name 0-0
  • 從尾部開始消費:
XGROUP CREATE mystream consumer-group-name $

執行效果如下:

127.0.0.1:6379> XGROUP CREATE mystream mygroup 0-0
OK

05 XREADGROUP GROUP 讀取消費組中的消息

使用 XREADGROUP GROUP 讀取消費組中的消息,語法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消費組名
  • consumer :消費者名。
  • count :讀取數量。
  • milliseconds :阻塞毫秒數。
  • key :隊列名。
  • ID :消息 ID。

示例:

127.0.0.1:6379>  XREADGROUP group mygroup consumerA count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1712473185388-0"
         2) 1) "name1"
            2) "value1"
            3) "name2"
            4) "value2"

消費者組 mygroup 中的消費者 consumerA ,從 名為 mystream 的 Stream 中讀取消息。

  • COUNT 1 表示一次最多讀取一條消息
  • > 表示消息的起始位置是當前可用消息的 ID,即從當前未讀取的最早消息開始讀取。

06 XACK 消息消費確認

接收到消息之后,我們要手動確認一下(ack),語法格式:

xack key group-key ID [ID ...]

示例:

127.0.0.1:6379> XACK mystream mygroup 1713089061658-0
(integer) 1

消費確認增加了消息的可靠性,一般在業務處理完成之后,需要執行 ack 確認消息已經被消費完成,整個流程的執行如下圖所示:

圖片圖片

我們可以使用 xpending 命令查看消費者未確認的消息ID:

127.0.0.1:6379> xpending mystream mygroup
1) (integer) 1
2) "1713091227595-0"
3) "1713091227595-0"
4) 1) 1) "consumerA"
      2) "1"

07 XTRIM 限制 Stream 長度

我們使用 XTRIM 對流進行修剪,限制長度, 語法格式:

127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1712535017402-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 4
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1712498239430-0"
   2) 1) "name"
      2) "zhangyogn"
2) 1) "1712535017402-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"

通過學習這七個核心命令,筆者發現 Redis Stream 既支持簡單的生產消費模型,也支持發布訂閱模型。

3 SpringBoot Redis Stream 實戰

演示代碼地址:

https://github.com/makemyownlife/courage-cache-demo

1、添加 SpringBoot Redis 依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2、yaml 文件配置

圖片圖片

3、RedisTemplate 配置

圖片圖片

4、定義stream監聽器

圖片圖片

5、定義 StreamContainer 并啟動

圖片圖片

6、發送消息

圖片圖片

執行完成之后,消費者就可以打印如下日志:

圖片圖片

通過實戰流程,我們發現 SpringBoot 整合 Redis Stream 并不復雜,需要注意:定義注冊信息時,需要確保 stream key 和消費者組已經創建好了。

4 Redis stream 用做消息隊列完美嗎

筆者認為 Redis stream 用于消息隊列最大的進步在于:實現了發布訂閱模型。

發布訂閱模型具有如下特點:

  • 消費獨立相比隊列模型的匿名消費方式,發布訂閱模型中消費方都會具備的身份,一般叫做訂閱組(訂閱關系),不同訂閱組之間相互獨立不會相互影響。
  • 一對多通信基于獨立身份的設計,同一個主題內的消息可以被多個訂閱組處理,每個訂閱組都可以拿到全量消息。因此發布訂閱模型可以實現一對多通信。

我們曾經詬病 Redis List 數據結構用做隊列時,因為消費時沒有 Ack 機制,應用異常掛掉導致消息偶發丟失的情況,Redis Stream 從設計角度來講已經完美的解決了。

因為消費者內部有一個屬性 pending_ids , 記錄了當前消費者讀取但沒有回復 ACK 的消息 ID 列表 。當消費者重新上線,這些消息可以重新被消費。

但 Redis stream 用做消息隊列完美嗎 ?

這個還真沒有,原因在于 Redis 先天不足。

1、Redis 本身定位是內存數據庫,它的設計之初都是為緩存準備的,并不具備消息堆積的能力。而專業消息隊列一個非常重要的功能是數據中轉樞紐,Redis 的定位很難滿足,使用起來要非常小心。

2、Redis 的高可用方案可能丟失消息(AOF 持久化和主從復制都是異步 ),而專業消息隊列可以針對不同的場景選擇不同的高可用策略。也就說,專業消息隊列的高可用方案依然有可能會丟失消息,但是它有完善的方案支持不丟失消息。

所以,筆者認為 Redis 非常適合輕量級消息隊列解決方案,即:數據量可控  + 業務模型簡單 + 消息堆積概率低 + 報警監控 。

責任編輯:武曉燕 來源: 勇哥Java實戰
相關推薦

2022-02-28 08:42:49

RedisStream消息隊列

2022-04-12 11:15:31

Redis消息隊列數據庫

2024-03-22 12:10:39

Redis消息隊列數據庫

2024-10-25 08:41:18

消息隊列RedisList

2023-12-30 13:47:48

Redis消息隊列機制

2017-10-11 15:08:28

消息隊列常見

2022-01-15 07:20:18

Redis List 消息隊列

2022-01-21 19:22:45

RedisList命令

2024-09-18 07:00:00

消息隊列中間件消息隊列

2020-01-14 15:08:44

Redis5Streams數據庫

2023-09-12 14:58:00

Redis

2018-03-29 08:38:10

2021-04-23 13:20:13

Redis數據庫代碼

2023-10-24 08:25:20

TCC模式事務

2025-06-27 10:41:04

Redis數據庫集群

2021-02-19 09:19:11

消息隊列場景

2009-11-09 11:15:06

WCF消息隊列

2010-04-13 17:00:43

Unix消息隊列

2010-04-21 12:12:56

Unix 消息隊列

2012-09-24 11:48:05

IBMdw
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产成人精品一区二区三区视频 | 无码日韩精品一区二区免费 | 这里有精品 | 欧美在线天堂 | av在线免费观看不卡 | 成人免费看黄 | 成人看片在线观看 | 日韩伦理一区二区 | 91中文字幕在线 | 国产精品无码专区在线观看 | 久久看片| av网址在线| av一级久久 | 免费在线观看av | 国产精品theporn | 精品国产区 | 成人不卡 | 成年人视频在线免费观看 | 欧美日韩午夜精品 | xxxxx黄色片 欧美一区免费 | 亚洲欧美激情网 | 人人爽日日躁夜夜躁尤物 | 欧美黄色免费网站 | h视频在线观看免费 | 久久久精品亚洲 | 国产精品久久久久久久久久 | 翔田千里一区二区 | 亚州精品天堂中文字幕 | 精品久久久久久久 | 日本一二三区高清 | 亚洲精品一区二区三区中文字幕 | 精品视频一区二区三区 | 999久久久久久久久6666 | 成人免费视频网址 | 狠狠操av| 成人h免费观看视频 | 日日草天天干 | 日日爱夜夜操 | 国产精品久久久久9999鸭 | 日韩一区av | 日韩一区二 |