成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

深入淺出RabbitMQ:順序消費(fèi)、死信隊(duì)列和延時(shí)隊(duì)列

開發(fā) 架構(gòu)
RabbitMQ 是一個(gè)功能強(qiáng)大的消息中間件,它在許多互聯(lián)網(wǎng)應(yīng)用中扮演了關(guān)鍵角色,比如華為攝像機(jī) SDK 的監(jiān)控圖像數(shù)據(jù)上報(bào),大部分電商系統(tǒng)的異步消費(fèi)等等。?

大家好,我是小?,一個(gè)漂泊江湖多年的 985 非科班程序員,曾混跡于國企、互聯(lián)網(wǎng)大廠和創(chuàng)業(yè)公司的后臺(tái)開發(fā)攻城獅。

1. 引言

在今天的文章中,我們來聊一聊 RabbitMQ,這是小 ? 在工作中用的最早的消息中間件,主要用于大量數(shù)據(jù)的異步消費(fèi)。

2. RabbitMQ

2.1 核心組件

RabbitMQ 是一個(gè)開源的消息中間件,它實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP),同時(shí)提供了各種重要組件來支持消息的生產(chǎn)、傳輸和消費(fèi)。

圖片圖片

  • Producer(生產(chǎn)者): 生產(chǎn)者是消息的發(fā)送方,負(fù)責(zé)將消息發(fā)布到 RabbitMQ 服務(wù)器。消息可以包含任何內(nèi)容,例如任務(wù)、日志、通知等。
  • Channel(信道):消息推送與接收時(shí)使用的通道。
  • Exchange(交換機(jī)): 交換機(jī)是消息的中轉(zhuǎn)站,它接收來自生產(chǎn)者的消息并將其路由到一個(gè)或多個(gè)隊(duì)列。不同類型的交換機(jī),如 fanout,direct,topic,headers,支持不同的路由規(guī)則。
  • Queue(隊(duì)列): 隊(duì)列是消息的緩沖區(qū),消息在發(fā)送到消費(fèi)者之前存儲(chǔ)在隊(duì)列中,消費(fèi)者從隊(duì)列中獲取消息并進(jìn)行處理。
  • Consumer(消費(fèi)者): 消費(fèi)者是消息的接收方,它從隊(duì)列中獲取消息并進(jìn)行處理。消費(fèi)者可以是多個(gè),它們可以在不同的應(yīng)用程序或服務(wù)器上運(yùn)行。

2.2 工作流程

RabbitMQ 的工作方式是基于生產(chǎn)者、交換機(jī)和隊(duì)列之間的協(xié)作。這是一個(gè)簡單的消息傳遞過程:

  • 將隊(duì)列與交換機(jī)綁定(Binding)起來,定義了消息的路由規(guī)則;
  • 生產(chǎn)者將消息發(fā)布到交換機(jī),交換機(jī)根據(jù)綁定規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列;
  • 消費(fèi)者從隊(duì)列中獲取消息并進(jìn)行處理。

這種模型具有高度的靈活性,可以輕松處理大量消息,同時(shí)確保消息的可靠傳遞。

2.3 特性

說到消息中間件,很多人首先想到的就是 Kafka,但 RabbitMQ 也是許多金融或互聯(lián)網(wǎng)公司構(gòu)建可靠、可伸縮和高性能系統(tǒng)的首選。

這是為什么呢?

主要得從 RabbitMQ 的特性說起,主要有二:一個(gè)是功能強(qiáng)大,另一個(gè)是可靠性!

RabbitMQ 注重消息的可靠性和靈活性,適合任務(wù)排隊(duì)和消息傳遞。而 Kafka 是分布式流式平臺(tái),注重日志存儲(chǔ)和數(shù)據(jù)分發(fā)。

順序消費(fèi)也是可靠性的一種,RabbitMQ 可以使用單一隊(duì)列或多個(gè)單一隊(duì)列來確保順序消費(fèi)。

除此之外,RabbitMQ 還提供持久性隊(duì)列和消息,以確保消息在 RabbitMQ 服務(wù)器宕機(jī)后不會(huì)丟失。另外,生產(chǎn)者可以使用發(fā)布確認(rèn)機(jī)制來確認(rèn)消息是否被接收。

RabbitMQ 相對(duì) kafka 可靠性更好,數(shù)據(jù)更不易丟失,這對(duì)于一些數(shù)據(jù)敏感型的業(yè)務(wù)來說,顯然更適合用前者。

并且,RabbitMQ 中原生支持死信隊(duì)列,可以更好地處理未完成的業(yè)務(wù)消息,以及實(shí)現(xiàn)延時(shí)隊(duì)列等特性,接下來我們一一介紹。

3. 保證順序消費(fèi)

RabbitMQ 提供了多個(gè)隊(duì)列模型來保證消息的順序消費(fèi)。這對(duì)于某些應(yīng)用程序非常重要,例如處理訂單、支付和庫存管理。

消息錯(cuò)亂消費(fèi)的場景

圖片圖片

如上圖所示,有三條業(yè)務(wù)消息分別是刪除、增加和修改操作,但是 Consumer 沒有按順序消費(fèi),最終存儲(chǔ)的順序是增加、修改和刪除,就會(huì)發(fā)生數(shù)據(jù)錯(cuò)亂。

針對(duì)消息有序性的問題,RabbitMQ 的解決方法是分三個(gè)階段來保證。

  • 發(fā)送消息:入隊(duì)列

消息發(fā)送時(shí),需要業(yè)務(wù)來保證順序性,就是保證生產(chǎn)者入隊(duì)的順序是有序的。

在分布式的場景下如果難以保證各個(gè)服務(wù)器的入隊(duì)順序,則可以加分布式鎖的方式來解決。或者在業(yè)務(wù)生產(chǎn)方的消息里帶上消息遞增 ID,以及消息產(chǎn)生的時(shí)間戳。

  • 隊(duì)列中的消息

在 RabbitMQ 的消息會(huì)保存在隊(duì)列(Queue)中,在同一個(gè)隊(duì)列里的消息是先進(jìn)先出(FIFO)的,這個(gè)由 RabbitMQ 來幫我們保證順序。

而不同隊(duì)列中的消息,RabbitMQ 無法保證其順序性,就像我們?cè)谑程么蝻堃粯樱驹诓煌呐抨?duì)隊(duì)列,我們也無法保證會(huì)比其他隊(duì)列的人先打上飯。

  • 消費(fèi)消息:出隊(duì)

一般來說,出隊(duì)后的順序消費(fèi)交給消費(fèi)者去保證。我們說的保證消費(fèi)順序,通常也是指消費(fèi)者消費(fèi)消息的順序。

有多個(gè)消費(fèi)者的情況下,通常是無法保證消息順序的。

這就相當(dāng)于我們?cè)谂抨?duì)打飯時(shí),有多個(gè)打飯阿姨,但是每個(gè)阿姨打飯的速度不一致,對(duì)應(yīng)我們消費(fèi)者的消費(fèi)能力也不同。

所以,為了保證消息的順序性,我們可以只使用一個(gè)消費(fèi)者來接收業(yè)務(wù)消息。

就好比只有一個(gè)阿姨在打飯,來得早就一定能早點(diǎn)打上飯。但很明顯,這樣效率不是很高,所以在使用時(shí)我們需要權(quán)衡利弊:看業(yè)務(wù)更需要順序性,還是更需要消費(fèi)效率。

優(yōu)先級(jí)隊(duì)列

在保證順序消費(fèi)時(shí),另一個(gè)迂回策略是可以使用優(yōu)先級(jí)隊(duì)列(Priority Queue)。

在 RabbitMQ3.5 之后,當(dāng)消費(fèi)者數(shù)量較少,如果服務(wù)器檢測到消費(fèi)者不能及時(shí)消費(fèi)消息的情況下,優(yōu)先級(jí)隊(duì)列就會(huì)生效。

具體有兩種優(yōu)先級(jí)策略:

  • 設(shè)置隊(duì)列的優(yōu)先級(jí)
  • 設(shè)置消息的優(yōu)先級(jí)

在聲明隊(duì)列時(shí),我們可以通過 x-max-priority 屬性來設(shè)置隊(duì)列的最大優(yōu)先級(jí),或通過 Priority 屬性來設(shè)置消息的優(yōu)先級(jí),從 1~10。

Golang 實(shí)現(xiàn)代碼如下:

// 隊(duì)列屬性
props := make(map[string]interface{})
// 設(shè)置隊(duì)列最大優(yōu)先級(jí)
props["x-max-priority"] = 10

ch.Publish(
   "tizi365",     // 交換機(jī)
   "", // 路由參數(shù)
   false,
   false,
   amqp.Publishing{
       Priority:5, // 設(shè)置消息優(yōu)先級(jí)
       DeliveryMode:2,  // 消息投遞模式,1代表非持久化,2代表持久化,
       ContentType: "text/plain",
       Body:       []byte(body),
  })

當(dāng)優(yōu)先級(jí)隊(duì)列消費(fèi)生效時(shí),會(huì)首先消費(fèi)高優(yōu)先級(jí)隊(duì)列中的優(yōu)先級(jí)高的消息,以此來實(shí)現(xiàn)順序消費(fèi)。

但需要注意的是,優(yōu)先級(jí)隊(duì)列觸發(fā)的條件比較苛刻,在需要嚴(yán)格保證業(yè)務(wù)消息順序的情況下最好不要使用!

4. 死信隊(duì)列

RabbitMQ 里,當(dāng)消息在隊(duì)列中變成死信(消費(fèi)者無法正常處理的消息)之后,它會(huì)被重新投遞到一個(gè)交換機(jī)上(即死信交換機(jī)),死信交換機(jī)上綁定的消費(fèi)隊(duì)列就是死信隊(duì)列。

圖片圖片

死信的產(chǎn)生

死信產(chǎn)生需要滿足如下條件:

  • 消息被消費(fèi)者手動(dòng)拒絕接收,并且 requeue(重新加入隊(duì)列)策略為 False;
  • 消息已經(jīng)過期(TTL);
  • 隊(duì)列達(dá)到最大長度,消息裝不下了。

死信的處理步驟

當(dāng)死信產(chǎn)生時(shí),如果我們定義了一個(gè)死信交換機(jī)(其實(shí)就是一個(gè)普通的交換機(jī),只是用于處理死信,所以叫死信交換機(jī)),然后在死信交換機(jī)上綁定了一個(gè)隊(duì)列(稱作死信隊(duì)列)。

最后,如果死信隊(duì)列有消費(fèi)者監(jiān)聽時(shí),死信消息的處理就會(huì)和正常業(yè)務(wù)消息一樣,從交換機(jī)到隊(duì)列,再由死信消費(fèi)者(監(jiān)聽死信隊(duì)列的消費(fèi)者)正常消費(fèi)。

5. 延時(shí)隊(duì)列

RabbitMQ 本身不支持延時(shí)隊(duì)列,但是我們可以通過 RabbitMQ 的插件 rabbitmq-delayed-message-exchange,或者使用 死信隊(duì)列 + 消息過期 的方式來實(shí)現(xiàn)。

5.1 應(yīng)用場景

當(dāng)我們?cè)陔娚汤镔徫铮蛘咴?12306 買票時(shí),大概都會(huì)遇到這樣一個(gè)場景:每次下訂單后,到支付訂單中間有一段商品鎖定時(shí)間,超過時(shí)間后未支付訂單就會(huì)關(guān)閉。

狀態(tài)轉(zhuǎn)換圖如下:

圖片圖片

5.2 插件實(shí)現(xiàn)

  • 安裝插件

Github 地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

從 github 的 release 頁面的 assets, 下載 rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 文件,把文件放到 rabbitmq 插件目錄(plugins目錄)

提示:版本號(hào)可能跟本教程不一樣,如果你的 rabbitmq 就是最新版本,插件也選擇最新版本就行。

  • 激活插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 定義交換機(jī)

通過 x-delayed-type 設(shè)置自定義交換機(jī)屬性,支持發(fā)送延遲消息:

props := make(map[string]interface{})
   //關(guān)鍵參數(shù),支持發(fā)送延遲消息
   props["x-delayed-type"] = "direct"

   // 聲明交換機(jī)
   err = ch.ExchangeDeclare(
       "delay.queue",   // 交換機(jī)名字
       "fanout", // 交換機(jī)類型
       true,     // 是否持久化
       false,    
       false,
       false,
       props,      // 設(shè)置屬性
  )
  • 發(fā)送延遲消息

通過消息頭(x-delay),設(shè)置消息延遲時(shí)間。

msgHeaders := make(map[string]interface{})
       // 通過消息頭,設(shè)置消息延遲時(shí)間,單位毫秒
       msgHeaders["x-delay"] = 6000

       err = ch.Publish(
           "delay.queue",     // 交換機(jī)名字
           "", // 路由參數(shù)
           false,
           false,
           amqp.Publishing{
               Headers:msgHeaders, // 設(shè)置消息頭
               ContentType: "text/plain",
               Body:       []byte(body),
          })

5.3 死信隊(duì)列 + 消息過期方案

該方案的核心思想是,先創(chuàng)建死信交換機(jī)、隊(duì)列和消費(fèi)者,來監(jiān)聽死信消息。

然后創(chuàng)建定時(shí)過期的消息,比如訂單支付的時(shí)間為 30min,則將消息的 TTL(最大存活時(shí)間)設(shè)置為 30min,將消息放到一個(gè)沒有消費(fèi)者消費(fèi)的隊(duì)列中,當(dāng)消息過期后就會(huì)成為死信。

死信消息被重新發(fā)送到死信交換機(jī),然后我們?cè)谒佬抨?duì)列中消費(fèi)該消息,根據(jù)商品 ID 判斷該商品是否被支付。

如果沒有支付,就取消訂單,修改訂單狀態(tài)為待下單。如果已經(jīng)支付,就將商品狀態(tài)修改為已完成,并丟掉這條死信消息。

5. 小結(jié)

RabbitMQ 是一個(gè)功能強(qiáng)大的消息中間件,它在許多互聯(lián)網(wǎng)應(yīng)用中扮演了關(guān)鍵角色,比如華為攝像機(jī) SDK 的監(jiān)控圖像數(shù)據(jù)上報(bào),大部分電商系統(tǒng)的異步消費(fèi)等等。

責(zé)任編輯:武曉燕 來源: xin猿意碼
相關(guān)推薦

2021-07-19 11:54:15

MySQL優(yōu)先隊(duì)列

2024-12-25 09:32:06

2023-12-18 09:46:13

Kafka集群開發(fā)

2021-03-16 08:54:35

AQSAbstractQueJava

2011-07-04 10:39:57

Web

2023-04-27 07:43:22

RabbitMQ重試隊(duì)列死信隊(duì)列

2024-04-15 00:00:00

RabbitMQ死信隊(duì)列消息

2021-08-11 07:54:47

Commonjs

2019-01-07 15:29:07

HadoopYarn架構(gòu)調(diào)度器

2021-07-20 15:20:02

FlatBuffers阿里云Java

2017-07-02 18:04:53

塊加密算法AES算法

2012-05-21 10:06:26

FrameworkCocoa

2022-09-26 09:01:15

語言數(shù)據(jù)JavaScript

2023-10-10 13:39:53

Spring隊(duì)列優(yōu)化

2013-09-16 09:56:29

TCP協(xié)議網(wǎng)絡(luò)協(xié)議send

2022-01-11 07:52:22

CSS 技巧代碼重構(gòu)

2025-03-27 09:38:35

2021-04-27 08:54:43

ConcurrentH數(shù)據(jù)結(jié)構(gòu)JDK8

2019-11-11 14:51:19

Java數(shù)據(jù)結(jié)構(gòu)Properties

2022-12-02 09:13:28

SeataAT模式
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 欧美成人激情 | 国产精品高清一区二区三区 | 欧美色综合网 | 一级毛片播放 | 综合色久 | 自拍偷拍中文字幕 | 国产欧美一级 | 久久天堂| 国产乱码精品一区二区三区中文 | 亚洲成人av在线播放 | 日韩精品久久久久久 | 黑人性hd| 91佛爷在线观看 | 国产成人一区二区三区 | 在线观看国产www | 精品一区二区在线看 | 久久精品播放 | 久草视频网站 | 国产一区二区欧美 | 午夜影院在线观看视频 | 亚洲视频在线观看 | 99精品一区 | 二区亚洲 | 99久久免费观看 | 国产91精品网站 | 精品国产一区二区在线 | 日日夜夜天天久久 | 欧美日韩亚洲国产综合 | 激情亚洲 | 欧美精品一区二区三区四区 在线 | 一区二区三区亚洲 | 91成人免费观看 | 成人福利网 | 国产高清精品一区二区三区 | 国产超碰人人爽人人做人人爱 | 国产美女在线免费观看 | 97影院在线午夜 | 成人福利视频 | 99精品免费久久久久久日本 | 欧美一级免费看 | 亚洲久视频 |