成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

SpringBoot整合RabbitMQ保證消息的可靠的投遞及消費

開發(fā) 架構(gòu)
Spring.rabbitmq.publisher-confirm-type及spring.rabbitmq.publisher-returns 的配置值。

環(huán)境:SpringBoot2.7.9

消息丟失場景

  1. 生產(chǎn)者丟失消息
    生產(chǎn)者發(fā)出的數(shù)據(jù)由于網(wǎng)絡(luò)原因沒有到底MQ Server丟失
  2. MQ Server丟消息
    由于消息隊列沒有持久化或者是消息沒有持久化,在Server重啟后消息丟失
  3. 消費者丟消息
    接收到消息后,業(yè)務(wù)還沒有處理完成,服務(wù)宕機(當(dāng)你是自動ACK)。

生產(chǎn)者丟失解決方案

  1. 通過事務(wù)(不推薦)
  2. 確認(rèn)機制(推薦)

這里只講如何通過確認(rèn)機制保證生產(chǎn)者不丟失消息

  • 引入依賴

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

  • 聲明交換機及隊列

@Bean
public TopicExchange topicExchange() {
return new TopicExchange("akf.exchange", true, false) ;
}
@Bean
public Queue queue() {
return new Queue("akf.queue", true, false, false) ;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("akf.#") ;
}

  • RabbitMQ配置

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
template:
mandatory: true

注意:spring.rabbitmq.publisher-confirm-type及spring.rabbitmq.publisher-returns 的配置值。

接下來是為RabbitTemplate配置對應(yīng)的Callback,Publisher確認(rèn)回調(diào),Publisher返回回調(diào)。

  1. 確認(rèn)回調(diào)
    當(dāng)消息發(fā)送到了交換機則ack=true,當(dāng)消息無法發(fā)送到交換機則ack=false。
  2. 返回回調(diào)
    當(dāng)消息能夠發(fā)送到交換機,但是不能路由到隊列則會調(diào)用該return回調(diào)。

RabbitTemplate是單例的可以通過兩種方式配置對應(yīng)的回調(diào)。

  1. 自定義RabbitTemplate。
  2. 通過AWare接口獲取RabbitTemplate配置。

這里只講通過AWare接口配置回調(diào)。

  • 配置Callback

@Component
public class ConfigRabbitTemplate implements ApplicationContextAware {

@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class) ;
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlation: " + correlationData) ;
if (ack) {
System.out.println("消息發(fā)送到交換機") ;
} else {
System.out.println("消息發(fā)送失敗 - " + ", cause" + cause) ;
}
}
});
rabbitTemplate.setReturnsCallback(new ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(returned.getExchange() + ", " + returned.getRoutingKey() + ", " + returned.getReplyCode() + ", " + returned.getMessage().toString()) ;
}
});
}

}

使用錯誤的交換機和錯誤的路由key分別測試即可以看到上面的輸出信息了。

MQ Server丟消息

在通過@Bean聲明交換機和隊列時設(shè)置持久性,在消息上設(shè)置持久化。

@Bean
public TopicExchange topicExchange() {
// 這里的第二個參數(shù)就是設(shè)置是否持久化,如果設(shè)置為false,當(dāng)服務(wù)重啟交換機將丟失
// 第三個參數(shù)是否自動刪除,當(dāng)不再使用該交換機時會自動刪除該交換機
return new TopicExchange("akf.exchange", true, false) ;
}
@Bean
public Queue queue() {
// 第二個參數(shù)true設(shè)置隊列是持久化的,當(dāng)服務(wù)重啟隊列不會丟失
return new Queue("akf.queue", true, false, false) ;
}

設(shè)置消息持久化。

Message message = MessageBuilder.withBody("Hello".getBytes())
// 設(shè)置消息投遞模式為持久化的(默認(rèn)不設(shè)置就是持久化的)
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build() ;

消費者丟消息

關(guān)閉自動應(yīng)答機制。

默認(rèn)是自動應(yīng)答,當(dāng)消息監(jiān)聽方法中沒有異常時則正常應(yīng)答,當(dāng)發(fā)生異常時,在默認(rèn)情況下會重新入隊列(這樣就會出現(xiàn)死循環(huán))。

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
acknowledgeMode: manual #設(shè)置為手動應(yīng)答

消息監(jiān)聽。

@RabbitListener(queues = {"akf.queue"})
public void onMessage(Message message, Channel channel) throws Exception {
try {
System.out.println("接收到消息: " + new String(message.getBody()));
// ... 這里處理我們的業(yè)務(wù)代碼
// 當(dāng)消費者把消息消費成功,再手動應(yīng)答RabbitMQ
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 如果發(fā)生了異常,我們一般的處理是直接扔掉死信隊列,一般這里出現(xiàn)錯誤都是消息有問題
// 如果消息出現(xiàn)問題,你重試再入隊列是無意義的
}
}

消息重試

如果消息消費時出現(xiàn)錯誤,你又希望能夠通過重試來盡可能的處理掉該消息,Spring也提供了相應(yīng)的重試機制。

修改配置:

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
acknowledgeMode: auto
concurrency: 1
retry:
# 開啟重試
enabled: true
# 延遲1s后開始重試
initialInterval: 1000
# 每次消息重試的間隔乘數(shù)
multiplier: 3
# 2次間的重試最大間隔時間
maxInterval: 20000
maxAttempts: 4 #重試4次,1s, 3s, 9s
stateless: true #如果消息處理中存在事務(wù)則需要將其設(shè)置為false

如果只是做上面的配置,重試指定次數(shù)后消息將會被丟棄,這是默認(rèn)行為。Spring提供了 MessageRecoverer接口來決定消息如何處理。默認(rèn)Spring提供如下幾種實現(xiàn):

  1. ImmediateRequeueMessageRecoverer
  2. RejectAndDontRequeueRecoverer
  3. RepublishMessageRecoverer

我們只需要定義一個Bean為MessageRecoverer即可,這里我們就用Spring提供的RepublishMessageRecoverer重新發(fā)布消息。

@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error") ;
}

這里將消息重新發(fā)布一個專門的隊列(重試指定次數(shù)后)。

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2020-10-14 08:36:10

RabbitMQ消息

2021-04-27 07:52:18

RocketMQ消息投遞

2024-05-09 08:04:23

RabbitMQ消息可靠性

2024-12-18 07:43:49

2021-02-02 11:01:31

RocketMQ消息分布式

2023-12-04 09:23:49

分布式消息

2020-09-27 07:44:08

RabbitMQ投遞消息

2023-11-27 17:29:43

Kafka全局順序性

2024-08-12 12:17:03

2024-05-23 12:11:39

2021-09-07 10:38:37

RabbitMQ 高可用消費

2023-11-30 18:03:02

TCP傳輸

2024-09-02 09:14:36

SpringRabbitMQ數(shù)據(jù)

2024-09-05 08:58:37

2022-12-14 08:23:30

2021-04-15 09:17:01

SpringBootRocketMQ

2009-08-27 10:01:27

ibmdw云計算

2024-11-04 08:02:23

SpringRabbitMQ中間件

2024-12-24 08:44:55

ActiveMQRabbitMQ交換機

2022-01-10 11:58:51

SpringBootPulsar分布式
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 精品久久精品 | 国产一区欧美 | 欧美aa在线| 久久最新精品视频 | 日韩黄色免费 | 国产精品一区二区在线 | 亚洲不卡在线观看 | 国产亚洲精品一区二区三区 | 久久久99精品免费观看 | 欧美激情综合网 | www.男人天堂.com | 欧美激情视频一区二区三区免费 | 中文字幕在线观看一区二区 | 综合久久国产 | 欧美日韩国产中文字幕 | 成人免费视频网站在线观看 | 播放一级毛片 | 亚洲精品电影网在线观看 | 男女羞羞视频大全 | 一本色道久久综合亚洲精品高清 | 成人一区二区视频 | www.天堂av.com| 亚洲精品乱码 | 久久夜视频 | 成人在线小视频 | 超碰伊人久久 | 欧美亚洲一区二区三区 | 人人看人人射 | 欧美一二区 | 国产女人第一次做爰毛片 | 国产精品成人久久久久 | 欧美一区二区三区四区视频 | 中文字幕一区二区三区不卡 | 新疆少妇videos高潮 | 亚洲精品成人在线 | www精品美女久久久tv | 亚洲第一网站 | 精产国产伦理一二三区 | 美女视频h| 国产成人精品一区二区 | 国产精品久久久久久久7电影 |