RabbitMQ工作模式-Publish/Subscribe發(fā)布與訂閱模式
訂閱模式類型
訂閱模式示例圖:
前面2個(gè)案例中,只有3個(gè)角色:
- P:生產(chǎn)者,也就是要發(fā)送消息的程序
- C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來。
- queue:消息隊(duì)列,圖中紅色部分
而在訂閱模型中,多了一個(gè)exchange角色,而且過程略有變化:
- P:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X(交換機(jī))
- C:消費(fèi)者,消息的接受者,會(huì)一直等待消息到來。
- Queue:消息隊(duì)列,接收消息、緩存消息。
- Exchange:交換機(jī),圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
- Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列
- Direct:定向,把消息交給符合指定routing key 的隊(duì)列
- Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!
Publish/Subscribe發(fā)布與訂閱模式
1、模式說明
發(fā)布訂閱模式:
每個(gè)消費(fèi)者監(jiān)聽自己的隊(duì)列。
生產(chǎn)者將消息發(fā)給broker,由交換機(jī)將消息轉(zhuǎn)發(fā)到綁定此交換機(jī)的每個(gè)隊(duì)列,每個(gè)綁定交換機(jī)的隊(duì)列都將接收 到消息
2、案例
(1)生產(chǎn)者
package com.lijw.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Aron.li
* @date 2022/3/3 8:16
*/
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2. 設(shè)置參數(shù)
factory.setHost("127.0.0.1"); // ip 默認(rèn)值 localhost
factory.setPort(5672); //端口 默認(rèn)值 5672
factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /
factory.setUsername("libai"); // 用戶名 默認(rèn) guest
factory.setPassword("libai"); //密碼 默認(rèn)值 guest
//3. 創(chuàng)建連接 Connection
Connection connection = factory.newConnection();
//4. 創(chuàng)建Channel
Channel channel = connection.createChannel();
//5. 創(chuàng)建交換機(jī)
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
參數(shù):
1. exchange:交換機(jī)名稱
2. type:交換機(jī)類型
DIRECT("direct"):定向
FANOUT("fanout"):扇形(廣播),發(fā)送消息到每一個(gè)與之綁定隊(duì)列。
TOPIC("topic") 通配符的方式
HEADERS("headers") 參數(shù)匹配
3. durable:是否持久化
4. autoDelete:自動(dòng)刪除
5. internal:內(nèi)部使用。 一般false
6. arguments:參數(shù)
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
//6. 創(chuàng)建隊(duì)列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 7. 綁定隊(duì)列和交換機(jī)
/*
queueBind(String queue, String exchange, String routingKey)
參數(shù):
1. queue:隊(duì)列名稱
2. exchange:交換機(jī)名稱
3. routingKey:路由鍵,綁定規(guī)則
如果交換機(jī)的類型為fanout ,routingKey設(shè)置為""
*/
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");
//8. 發(fā)送消息至交換機(jī),由交換機(jī)分發(fā)消息
String body = "日志信息: 肥仔白調(diào)用了findAll方法...日志級(jí)別: INFO....";
channel.basicPublish(exchangeName, "", null, body.getBytes());
//9. 釋放資源
channel.close();
connection.close();
}
}
執(zhí)行生產(chǎn)者,我們可以查看一下創(chuàng)建的 交換機(jī) 以及 隊(duì)列信息:
下面再來看看隊(duì)列,如下:
下面我們繼續(xù)來寫兩個(gè)消費(fèi)者接收消息。
(2)消費(fèi)者1:讀取隊(duì)列1的消息
package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Aron.li
* @date 2022/3/2 16:16
*/
public class Consumer_PubSub1 {
//定義接收隊(duì)列的名稱
final static String queueName = "test_fanout_queue1";
public static void main(String[] args) throws IOException, TimeoutException {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2. 設(shè)置參數(shù)
factory.setHost("127.0.0.1"); // ip 默認(rèn)值 localhost
factory.setPort(5672); //端口 默認(rèn)值 5672
factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /
factory.setUsername("libai"); // 用戶名 默認(rèn) guest
factory.setPassword("libai"); //密碼 默認(rèn)值 guest
//3. 創(chuàng)建連接 Connection
Connection connection = factory.newConnection();
//4. 創(chuàng)建Channel
Channel channel = connection.createChannel();
//5. 創(chuàng)建隊(duì)列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
參數(shù):
1. queue:隊(duì)列名稱
2. durable:是否持久化,當(dāng)mq重啟之后,還在
3. exclusive:
* 是否獨(dú)占。只能有一個(gè)消費(fèi)者監(jiān)聽這隊(duì)列
* 當(dāng)Connection關(guān)閉時(shí),是否刪除隊(duì)列
4. autoDelete:是否自動(dòng)刪除。當(dāng)沒有Consumer時(shí),自動(dòng)刪除掉
5. arguments:參數(shù)。
*/
channel.queueDeclare(queueName, true, false, false, null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
參數(shù):
1. queue:隊(duì)列名稱
2. autoAck:是否自動(dòng)確認(rèn)
3. callback:回調(diào)對(duì)象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回調(diào)方法,當(dāng)收到消息后,會(huì)自動(dòng)執(zhí)行該方法
1. consumerTag:標(biāo)識(shí)
2. envelope:獲取一些信息,交換機(jī),路由key...
3. properties:配置信息
4. body:數(shù)據(jù)
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收隊(duì)列的數(shù)據(jù) body: " + new String(body));
}
};
channel.basicConsume(queueName,true,consumer);
//不需要關(guān)閉資源,因?yàn)橄M(fèi)者需要持續(xù)監(jiān)聽隊(duì)列信息
}
}
(3)消費(fèi)者2:讀取隊(duì)列2的消息
package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Aron.li
* @date 2022/3/2 16:16
*/
public class Consumer_PubSub2 {
//定義接收隊(duì)列的名稱
final static String queueName = "test_fanout_queue2";
public static void main(String[] args) throws IOException, TimeoutException {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2. 設(shè)置參數(shù)
factory.setHost("127.0.0.1"); // ip 默認(rèn)值 localhost
factory.setPort(5672); //端口 默認(rèn)值 5672
factory.setVirtualHost("/test"); //虛擬機(jī) 默認(rèn)值 /
factory.setUsername("libai"); // 用戶名 默認(rèn) guest
factory.setPassword("libai"); //密碼 默認(rèn)值 guest
//3. 創(chuàng)建連接 Connection
Connection connection = factory.newConnection();
//4. 創(chuàng)建Channel
Channel channel = connection.createChannel();
//5. 創(chuàng)建隊(duì)列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
參數(shù):
1. queue:隊(duì)列名稱
2. durable:是否持久化,當(dāng)mq重啟之后,還在
3. exclusive:
* 是否獨(dú)占。只能有一個(gè)消費(fèi)者監(jiān)聽這隊(duì)列
* 當(dāng)Connection關(guān)閉時(shí),是否刪除隊(duì)列
4. autoDelete:是否自動(dòng)刪除。當(dāng)沒有Consumer時(shí),自動(dòng)刪除掉
5. arguments:參數(shù)。
*/
channel.queueDeclare(queueName, true, false, false, null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
參數(shù):
1. queue:隊(duì)列名稱
2. autoAck:是否自動(dòng)確認(rèn)
3. callback:回調(diào)對(duì)象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回調(diào)方法,當(dāng)收到消息后,會(huì)自動(dòng)執(zhí)行該方法
1. consumerTag:標(biāo)識(shí)
2. envelope:獲取一些信息,交換機(jī),路由key...
3. properties:配置信息
4. body:數(shù)據(jù)
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收隊(duì)列的數(shù)據(jù) body: " + new String(body));
}
};
channel.basicConsume(queueName,true,consumer);
//不需要關(guān)閉資源,因?yàn)橄M(fèi)者需要持續(xù)監(jiān)聽隊(duì)列信息
}
}
3、測(cè)試
啟動(dòng)所有消費(fèi)者,然后使用生產(chǎn)者發(fā)送消息;在每個(gè)消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送的所有消息;到達(dá)廣播的效果。
- 消費(fèi)者1接收到的消息:
- 消費(fèi)者2接收到的消息:
從結(jié)果來看,生產(chǎn)者只需要發(fā)送一條消息,其余的消費(fèi)者全部收到了消息,達(dá)到了廣播的效果。
4、小結(jié)
交換機(jī)需要與隊(duì)列進(jìn)行綁定,綁定之后;一個(gè)消息可以被多個(gè)消費(fèi)者都收到。
發(fā)布訂閱模式與工作隊(duì)列模式的區(qū)別:
- 工作隊(duì)列模式不用定義交換機(jī),而發(fā)布/訂閱模式需要定義交換機(jī)。
- 發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊(duì)列模式的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。
- 發(fā)布/訂閱模式需要設(shè)置隊(duì)列和交換機(jī)的綁定,工作隊(duì)列模式不需要設(shè)置,實(shí)際上工作隊(duì)列模式會(huì)將隊(duì)列綁 定到默認(rèn)的交換機(jī) 。