RabbitMQ工作模式-Routing路由模式
作者:海洋的漁夫
Routing模式要求隊列在綁定交換機時要指定Routing key,消息會轉發到符合Routing key的隊列。
Routing路由模式
1、模式說明
路由模式特點:
- 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)。
- 消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey。
- Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息。
圖解:
- P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
- X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
- C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
- C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
2。案例
在編碼上與 Publish/Subscribe發布與訂閱模式 的區別是交換機的類型為:Direct,還有隊列綁定交換機的時候需要指定routing key。
在寫案例之前,我們首先定義一下需求:
- 生產者:發送兩條消息,一條消息的用于插入數據,另一條消息用于更新數據。
- 消費者1:接收插入數據的消息,進行數據插入。
- 消費者2:接收更新數據的消息,進行數據更新。
(1)生產者
package com.lijw.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Aron.li
* @date 2022/3/3 8:16
*/
public class Producer_Routing {
//交換機名稱
static final String DIRECT_EXCHAGE = "direct_exchange";
//隊列名稱
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//隊列名稱
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws IOException, TimeoutException {
//1.創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2. 設置參數
factory.setHost("127.0.0.1"); // ip 默認值 localhost
factory.setPort(5672); //端口 默認值 5672
factory.setVirtualHost("/test"); //虛擬機 默認值 /
factory.setUsername("libai"); // 用戶名 默認 guest
factory.setPassword("libai"); //密碼 默認值 guest
//3. 創建連接 Connection
Connection connection = factory.newConnection();
//4. 創建Channel
Channel channel = connection.createChannel();
//5. 創建交換機
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
參數:
1. exchange:交換機名稱
2. type:交換機類型
DIRECT("direct"):定向
FANOUT("fanout"):扇形(廣播),發送消息到每一個與之綁定隊列。
TOPIC("topic") 通配符的方式
HEADERS("headers") 參數匹配
3. durable:是否持久化
4. autoDelete:自動刪除
5. internal:內部使用。 一般false
6. arguments:參數
*/
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT, true, false, false, null);
// 6.聲明(創建)隊列
/**
* 參數1:隊列名稱
* 參數2:是否定義持久化隊列
* 參數3:是否獨占本次連接
* 參數4:是否在不使用的時候自動刪除隊列
* 參數5:隊列其它參數
*/
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
// 7. 綁定隊列和交換機
/*
queueBind(String queue, String exchange, String routingKey)
參數:
1. queue:隊列名稱
2. exchange:交換機名稱
3. routingKey:路由鍵,綁定規則
如果交換機的類型為fanout ,routingKey設置為""
*/
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");
//8. 發送消息至交換機,由交換機分發消息
// 發送信息
String message = "新增了商品。路由模式;routing key 為 insert " ;
/**
* 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage
* 參數2:路由key,簡單模式可以傳遞隊列名稱
* 參數3:消息其它屬性
* 參數4:消息內容
*/
channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
System.out.println("已發送消息:" + message);
// 發送信息
message = "修改了商品。路由模式;routing key 為 update" ;
/**
* 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage
* 參數2:路由key,簡單模式可以傳遞隊列名稱
* 參數3:消息其它屬性
* 參數4:消息內容
*/
channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
System.out.println("已發送消息:" + message);
//9. 釋放資源
channel.close();
connection.close();
}
}
執行發送消息:
發送消息之后,我們來看看聲明好的交換機:
(2)消費者1:專門接收 insert 的消息
package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Aron.li
* @date 2022/3/2 16:16
*/
public class Consumer_Routing1 {
//隊列名稱
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
public static void main(String[] args) throws IOException, TimeoutException {
//1.創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2. 設置參數
factory.setHost("127.0.0.1"); // ip 默認值 localhost
factory.setPort(5672); //端口 默認值 5672
factory.setVirtualHost("/test"); //虛擬機 默認值 /
factory.setUsername("libai"); // 用戶名 默認 guest
factory.setPassword("libai"); //密碼 默認值 guest
//3. 創建連接 Connection
Connection connection = factory.newConnection();
//4. 創建Channel
Channel channel = connection.createChannel();
//5. 創建隊列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
參數:
1. queue:隊列名稱
2. durable:是否持久化,當mq重啟之后,還在
3. exclusive:
* 是否獨占。只能有一個消費者監聽這隊列
* 當Connection關閉時,是否刪除隊列
4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
5. arguments:參數。
*/
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
參數:
1. queue:隊列名稱
2. autoAck:是否自動確認
3. callback:回調對象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回調方法,當收到消息后,會自動執行該方法
1. consumerTag:標識
2. envelope:獲取一些信息,交換機,路由key...
3. properties:配置信息
4. body:數據
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收隊列的數據 body: " + new String(body));
}
};
channel.basicConsume(DIRECT_QUEUE_INSERT,true,consumer);
//不需要關閉資源,因為消費者需要持續監聽隊列信息
}
}
(3)消費者2:專門接收 update 的消息
package com.lijw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author Aron.li
* @date 2022/3/2 16:16
*/
public class Consumer_Routing2 {
//隊列名稱
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws IOException, TimeoutException {
//1.創建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2. 設置參數
factory.setHost("127.0.0.1"); // ip 默認值 localhost
factory.setPort(5672); //端口 默認值 5672
factory.setVirtualHost("/test"); //虛擬機 默認值 /
factory.setUsername("libai"); // 用戶名 默認 guest
factory.setPassword("libai"); //密碼 默認值 guest
//3. 創建連接 Connection
Connection connection = factory.newConnection();
//4. 創建Channel
Channel channel = connection.createChannel();
//5. 創建隊列Queue
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
參數:
1. queue:隊列名稱
2. durable:是否持久化,當mq重啟之后,還在
3. exclusive:
* 是否獨占。只能有一個消費者監聽這隊列
* 當Connection關閉時,是否刪除隊列
4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
5. arguments:參數。
*/
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
參數:
1. queue:隊列名稱
2. autoAck:是否自動確認
3. callback:回調對象
*/
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回調方法,當收到消息后,會自動執行該方法
1. consumerTag:標識
2. envelope:獲取一些信息,交換機,路由key...
3. properties:配置信息
4. body:數據
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收隊列的數據 body: " + new String(body));
}
};
channel.basicConsume(DIRECT_QUEUE_UPDATE,true,consumer);
//不需要關閉資源,因為消費者需要持續監聽隊列信息
}
}
3、測試
啟動所有消費者,然后使用生產者發送消息;在消費者對應的控制臺可以查看到生產者發送對應routing key對應隊列的消息;到達按照需要接收的效果。
- 消費者1 收到了 insert 的消息
- 消費者2 收到了 update 的消息
4、小結
Routing模式要求隊列在綁定交換機時要指定routing key,消息會轉發到符合routing key的隊列。
責任編輯:姜華
來源:
今日頭條