基于Topic的消息發布與消費模式
閑話
朋友們,好久不見,不知道你們最近怎樣,但相信你們一定都挺好。已經有一段時間沒有更新了,個中原因不好細說,但是歸根結底也許是自己懶。這個不好,大家不要學。今天主要就是想分享一下關于消息處理機制的一些想法。
基本概念
1.Topic
同一個topic下消息的格式一致,例如topic為order-update-message消息的格式都是一個統一的OrderUpdateMessage的結構
2.key主鍵
同一主鍵下的消息列表具有順序性,例如key為訂單號order-0001的消息列表(Queue)下,可能包含的消息列表(Queue)如下:
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="modifying", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
3.Group消費者組
同一個topic下同一個group下的消費者,對這個group下的消息隊列進行搶占式消費。例如同一個消費者組group-1下的消費者consumer-1和消費者consumer-2,以及另外一個消費者組group-2下的消費者consumer-3,消息消費的結果可能如下:
// consumer-1消費的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
// consumer-2消費的消息
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
// consumer-3消費的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
Kafka的消息處理機制就是以這樣的形式實現的。
4.優勢
生產者和消費者完全解耦,生產者無需關注是否有消費者在消費,消費者也無需知道生產者是否在生成新的消息。
生產者只關注消息是否成功的發送到消息處理中間件,消費者只關注能否從消息處理中間件消費到消息。
消費者可以按組消費,同組內的消費者進行搶占式消費。
RabbitMq中的優秀實踐
1.RabbitMq消息處理機制
生產者講帶有指定RoutingKey的消息發送到對應的Exchange上,Exchange通過Binding定義的路由規格,將消息按照BindingKey分發到不同的Queue上,消費者從Queue拉取消息消費。
- Exchange & RoutingKey & Topic:RoutingKey決定了消息會被發送到哪個Exchange上,這和topic是類似的概念。
- Bind & BindingKey & Group:Exchange根據Binding定義的路由規格,將消息按照BindingKey分發到不同的Queue上,這里可以認為是對應了Group的概念。
- Queue & Group:Queue則是維護了一個Group下的某個隊列下的所有消息。
優秀實踐
因此如果要以RabbitMq實現基于Topic和Group實現的消息生產和消費的機制,可以將消息定義成以下類似的結構:
// Exchange: {value="order-update", type="fanout"}
// binding1: {value="promotion-service", bindingKey="order.*.paid"}
// binding2: {value='inventory-service', bindingKey="order.*"}
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)
假設此時有promotion-service(1個實例)和inventory-service(2個實例)兩個消費者消費消息,則對應的消息消費的結果可能是:
// inventory-service
// Exchange: {value="order-update", type="fanout"}
// QueueBinding: {value=Queue('inventory-service'), bindingKey="order.*"}
// inventory-service實例1消費到的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)
// inventory-service實例2消費到的消息
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
// promotion-service
// Exchange: {value="order-update", type="fanout"}
// QueueBinding: {value="promotion-service", bindingKey="order.*.paid"}
// promotion-service實例1消費到的消息
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)
總結
RabbitMQ的Exchange支持不同類型(Direct, Fanout, Topic, Headers),以及Binding可以對消息以更靈活的通配符的方式將消息分發到對應的Queue上,因此其消息處理機制更加靈活。
基于Topic的消息發布與消費模式,能夠將消費者和生產者完全解耦,相對RabbitMQ中的所支持的靈活處理消息的方式,更加簡單且易于理解,這也是Kafka的消息處理機制。
通過對比不同的中間件的消息處理機制也許能找到更好的實踐方式。