成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

基于Topic的消息發布與消費模式

開發
基于Topic的消息發布與消費模式,能夠將消費者和生產者完全解耦,相對RabbitMQ中的所支持的靈活處理消息的方式,更加簡單且易于理解,這也是Kafka的消息處理機制。

閑話

朋友們,好久不見,不知道你們最近怎樣,但相信你們一定都挺好。已經有一段時間沒有更新了,個中原因不好細說,但是歸根結底也許是自己懶。這個不好,大家不要學。今天主要就是想分享一下關于消息處理機制的一些想法。

基本概念

1.Topic

同一個topic下消息的格式一致,例如topic為order-update-message消息的格式都是一個統一的OrderUpdateMessage的結構

2.key主鍵

同一主鍵下的消息列表具有順序性,例如key為訂單號order-0001的消息列表(Queue)下,可能包含的消息列表(Queue)如下:

OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="modifying", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

3.Group消費者組

同一個topic下同一個group下的消費者,對這個group下的消息隊列進行搶占式消費。例如同一個消費者組group-1下的消費者consumer-1和消費者consumer-2,以及另外一個消費者組group-2下的消費者consumer-3,消息消費的結果可能如下:

// consumer-1消費的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)

// consumer-2消費的消息
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

// consumer-3消費的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

Kafka的消息處理機制就是以這樣的形式實現的。

4.優勢 

生產者和消費者完全解耦,生產者無需關注是否有消費者在消費,消費者也無需知道生產者是否在生成新的消息。

生產者只關注消息是否成功的發送到消息處理中間件,消費者只關注能否從消息處理中間件消費到消息。

消費者可以按組消費,同組內的消費者進行搶占式消費。

RabbitMq中的優秀實踐

1.RabbitMq消息處理機制

生產者講帶有指定RoutingKey的消息發送到對應的Exchange上,Exchange通過Binding定義的路由規格,將消息按照BindingKey分發到不同的Queue上,消費者從Queue拉取消息消費。

  • Exchange & RoutingKey & Topic:RoutingKey決定了消息會被發送到哪個Exchange上,這和topic是類似的概念。
  • Bind & BindingKey & Group:Exchange根據Binding定義的路由規格,將消息按照BindingKey分發到不同的Queue上,這里可以認為是對應了Group的概念。
  • Queue & Group:Queue則是維護了一個Group下的某個隊列下的所有消息。

優秀實踐

因此如果要以RabbitMq實現基于Topic和Group實現的消息生產和消費的機制,可以將消息定義成以下類似的結構:

// Exchange: {value="order-update", type="fanout"}
// binding1: {value="promotion-service", bindingKey="order.*.paid"}
// binding2: {value='inventory-service', bindingKey="order.*"}
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)

假設此時有promotion-service(1個實例)和inventory-service(2個實例)兩個消費者消費消息,則對應的消息消費的結果可能是:

// inventory-service
// Exchange: {value="order-update", type="fanout"}
// QueueBinding: {value=Queue('inventory-service'), bindingKey="order.*"}
// inventory-service實例1消費到的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)
// inventory-service實例2消費到的消息
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

// promotion-service
// Exchange: {value="order-update", type="fanout"}
// QueueBinding: {value="promotion-service", bindingKey="order.*.paid"}
// promotion-service實例1消費到的消息
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)

總結

RabbitMQ的Exchange支持不同類型(Direct, Fanout, Topic, Headers),以及Binding可以對消息以更靈活的通配符的方式將消息分發到對應的Queue上,因此其消息處理機制更加靈活。

基于Topic的消息發布與消費模式,能夠將消費者和生產者完全解耦,相對RabbitMQ中的所支持的靈活處理消息的方式,更加簡單且易于理解,這也是Kafka的消息處理機制。

通過對比不同的中間件的消息處理機制也許能找到更好的實踐方式。

責任編輯:趙寧寧 來源: FrenziedJavaLand
相關推薦

2022-10-28 13:33:05

Push模式互聯網高并發

2024-06-05 06:37:19

2022-12-13 08:39:53

Kafka存儲檢索

2023-08-24 09:01:25

消息拉取RocketMQ

2020-07-27 08:44:22

存儲Kafka 流程

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2023-11-10 09:22:06

2022-09-25 12:53:36

RocketMQtopic

2023-06-29 10:10:06

Rocket MQ消息中間件

2024-07-29 08:34:18

C++訂閱者模式線程

2023-12-04 08:24:23

2021-06-04 11:33:50

消息技巧排查

2021-03-01 07:31:53

消息支付高可用

2021-06-01 06:59:58

運維Jira設計

2023-11-27 17:29:43

Kafka全局順序性

2019-12-19 15:09:19

微信深色模式消息引用

2009-01-15 10:43:21

SCDMA通信產業McWiLL

2013-06-04 17:14:42

戴爾

2024-03-20 08:33:00

Kafka線程安全Rebalance

2024-12-18 07:43:49

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 成年人免费网站 | 精品国产欧美一区二区三区成人 | 欧美日韩免费一区二区三区 | 欧美成人第一页 | 国产毛片毛片 | 日批免费看 | 国产资源在线播放 | 视频在线观看一区 | 精品一区二区三区四区 | 日日夜夜天天 | 免费视频成人国产精品网站 | 国产第一页在线观看 | 久久在线视频 | 天天躁日日躁性色aⅴ电影 免费在线观看成年人视频 国产欧美精品 | 91精品久久久久久久久 | av在线一区二区三区 | 欧美性生活视频 | 这里只有精品999 | 日韩视频精品在线 | 亚洲国产精品视频 | 人人操日日干 | 亚洲图片一区二区三区 | 色av一区| 作爱视频免费观看 | 午夜精品一区二区三区在线视 | 超碰欧美 | 嫩呦国产一区二区三区av | 精品国产一级片 | 青草青草久热精品视频在线观看 | 亚洲欧美激情国产综合久久久 | 成人性生交a做片 | 久久久免费少妇高潮毛片 | 日韩欧美三级电影在线观看 | 91九色porny首页最多播放 | 国产一区二区三区免费 | 成人国产精品久久久 | 日韩精品一区二区三区中文在线 | 色www精品视频在线观看 | 国产精品不卡 | 欧美日韩免费在线 | 国产精品久久二区 |