成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

通過 Pulsar 源碼徹底解決重復消費問題

開發 前端
如果業務執行耗時,等到消息從那個單線程的無界隊列中取出來的時候很有可能已經過了 ackTimeou 的時間,從而導致了超時重發。

背景

最近真是和 Pulsar 杠上了,業務團隊反饋說是線上有個應用消息重復消費。

圖片

而且在測試環境是可以穩定復現的,根據經驗來看一般能穩定復現的都比較好解決。

定位問題

接著便是定位問題了,根據之前的經驗讓業務按照這幾種情況先排查一下:

圖片

通過排查:1,2可以排除了。

  1. 沒有相關日志
  2. 存在異常,但最外層也捕獲了,所以不管有無異常都會 ACK。

第三個也在消費的入口和提交消息出計算了時間,最終發現都是在2s左右 ACK 的。

偽代碼如下:

Consumer consumer = client.newConsumer()
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.topic(topic)
.ackTimeout(30, TimeUnit.SECONDS)
.subscriptionName("my-sub")
.messageListener(new MessageListener<byte[]>() {
@SneakyThrows
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
log.info("msg_id{}",msg.getMessageId().toString());
TimeUnit.SECONDS.sleep(2);
consumer.acknowledge(msg);
}
})
.subscribe();

那這就很奇怪了,因為代碼里配置的 ackTimeout 是 30s,理論上來說是不會存在超時導致消息重發的。

為了排除是否是超時引起的,直接將業務代碼注釋掉了,等于是消息收到后立即就 ACK,經過測試發現這樣確實就沒有重復消費了。

為了再次確認是不是和 ackTimeout 有關,直接將 .ackTimeout(30, TimeUnit.SECONDS) 注釋掉后測試,發現也沒有重復消費了。

確認原因

既然如此那一定是和這個配置有關了,但看代碼確實沒有超時,為了定位具體原因只有去看 client 的源碼了。

這里簡單梳理下消息的消費的流程:

  1. 根據 .receiverQueueSize(1000) 的配置,默認情況下 broker 會直接給客戶端推送 1000 條消息。
  2. 客戶端將這 1000 條消息保存到內部隊列中。
  3. 如果使用同步消費 receive()? 時,本質上就是去 take 這個內部隊列。
  4. 如果是使用的是 messageListener? 異步消費并配置 ackTimeout?,每當從隊列里獲得一條消息后便會把這條消息加入 UnAckedMessageTracker? 內部的一個時間輪中,定時檢測頂部是否存在消息,如果存在則會觸發重新投遞。4.1 加入時間輪后,異步調用我們自定義的事件,這個異步操作是提交到一個無界隊列中由單個線程依次排隊執行(這點是這次問題的關鍵)
  5. 業務 ACK 的時候會從時間輪中刪除消息,所以如果消息 ACK 的足夠快,在第四步就不會獲取到消息進行重新投遞。

圖片

整體流程如上圖,代碼細節如下圖:

圖片

所以問題的根本原因就是寫入時間輪(UnAckedMessageTracker)開始倒計時的線程和回調業務邏輯的不是同一個線程。

如果業務執行耗時,等到消息從那個單線程的無界隊列中取出來的時候很有可能已經過了 ackTimeou 的時間,從而導致了超時重發。

也就是用戶所理解的 ackTimeout 周期(應該進入回調時候開始計時)和 SDK 實現的不一致造成的。

之后我再次確認同樣的代碼換為同步消費是沒有問題的,不會導致重復消費:

while (true) {
Message msg = consumer.receive();
log.info(
"consumer Message received: " + new String(msg.getData()) + msg.getMessageId().toString());
TimeUnit.SECONDS.sleep(2);
consumer.acknowledge(msg);
}

查看代碼后發現同步代碼的獲取消息和加入 UnAckedMessageTracker 時間輪是同步的,也就不會出現超時的問題。

圖片

總結

所以其實 是messageListener? 異步消費的 ackTimeout 的語義是有問題的,需要將加入 UnAckedMessageTracker 處移動到回調函數中同步調用。

我查看了最新的 2.11.x 版本的代碼依然沒有修復,正準備提個 PR 切換到 master 時才發現已經有相關的 PR 了,只是還沒有發版。

修復的背景和思路也是類似的,具體參考:

https://github.com/apache/pulsar/pull/18911

其實業務中并不推薦使用 ackTimeout 這個配置了,不好預估時間從而導致超時,而且我相信大部分業務配置好 ackTImeout 后直到后續出問題的時候才想起來要改。所以干脆一開始就不要使用。

在 go 版本的 SDK 中直接廢棄掉了這個參數,推薦使用 nack API 替換。

圖片

責任編輯:武曉燕 來源: crossoverJie
相關推薦

2021-12-03 12:15:01

QT中文亂碼Windows

2009-11-27 10:31:02

GPRS路由

2025-03-03 00:13:50

2010-01-11 18:05:24

VB.NET窗體繼承

2010-01-04 15:05:53

2023-11-28 08:36:16

Spring中Body讀取

2009-12-25 09:39:08

ADSL MODEM

2025-06-17 06:40:45

DockerDocker鏡像

2010-01-14 10:19:05

2009-11-24 19:50:10

2009-12-03 18:45:41

2022-10-08 23:55:58

iOS蘋果開發

2024-11-04 10:05:00

AI模型

2020-09-28 14:41:24

Event Loop

2009-12-03 16:54:04

無線寬帶路由器

2009-12-04 16:25:24

2009-12-21 17:20:19

2022-05-31 09:01:13

GitHub工具安全

2018-09-18 11:28:01

2009-12-21 14:12:30

路由器配置故障
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲成人一区二区 | 99视频在线免费观看 | 国产精品久久久久aaaa九色 | 中文字幕免费 | 国产欧美一区二区三区日本久久久 | 伊人网影院 | 精品视频在线观看 | 久久久91| 日韩中文一区 | 国产精品视频播放 | 精品福利在线视频 | 91精品国产综合久久久动漫日韩 | 黄色av免费| 高清国产午夜精品久久久久久 | 视频1区 | 波多野结衣电影一区 | 中文av字幕| 亚洲永久精品国产 | 日本欧美国产在线 | av先锋资源 | 日韩精品久久一区二区三区 | 亚洲欧美中文日韩在线v日本 | 国产成人高清视频 | 91精品国产欧美一区二区 | 欧美一区二区三区大片 | 特黄特色大片免费视频观看 | 精品久久国产 | 精品亚洲视频在线 | av黄色免费 | 亚洲一区二区在线视频 | 欧美精| 日本啊v在线 | 亚洲精选一区二区 | 91不卡| 国产不卡在线播放 | 一区二区三区免费 | 中文字幕 在线观看 | 久久综合久 | 精品久久久久久久 | 国产成人99久久亚洲综合精品 | 亚洲成人精品 |