成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

一個(gè)詭異的 Pulsar InterruptedException 異常

開(kāi)發(fā) 前端
今天收到業(yè)務(wù)團(tuán)隊(duì)反饋線上有個(gè)應(yīng)用往 Pulsar 中發(fā)送消息失敗了,經(jīng)過(guò)日志查看得知是發(fā)送消息時(shí)候拋出了 Java.lang.InterruptedException 異常。

背景

圖片

今天收到業(yè)務(wù)團(tuán)隊(duì)反饋線上有個(gè)應(yīng)用往 Pulsar 中發(fā)送消息失敗了,經(jīng)過(guò)日志查看得知是發(fā)送消息時(shí)候拋出了 java.lang.InterruptedException 異常。

和業(yè)務(wù)溝通后得知是在一個(gè) gRPC 接口中觸發(fā)的消息發(fā)送,大約持續(xù)了半個(gè)小時(shí)的異常后便恢復(fù)正常了,這是整個(gè)問(wèn)題的背景。

前置排查

拿到該問(wèn)題后首先排查下是否是共性問(wèn)題,查看了其他的應(yīng)用沒(méi)有發(fā)現(xiàn)類似的異常;同時(shí)也查看了 Pulsar broker 的監(jiān)控大盤,在這個(gè)時(shí)間段依然沒(méi)有波動(dòng)和異常。

這樣可以初步排除是 Pulsar 服務(wù)端的問(wèn)題。

接著便是查看應(yīng)用那段時(shí)間的負(fù)載情況,從應(yīng)用 QPS 到 JVM 的各個(gè)內(nèi)存情況依然沒(méi)發(fā)現(xiàn)有什么明顯的變化。

Pulsar 源碼排查

既然看起來(lái)應(yīng)用本身和 Pulsar broker 都沒(méi)有問(wèn)題的話那就只能從異常本身來(lái)排查了。

首先第一步要得知具體使用的是 Pulsar-client? 是版本是多少,因?yàn)闃I(yè)務(wù)使用的是內(nèi)部基于官方 SDK 封裝 springboot starter? 所以第一步還得排查這個(gè) starter 是否有影響。

通過(guò)查看源碼基本排除了 starter? 的嫌疑,里面只是簡(jiǎn)單的封裝了 SDK 的功能而已。

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java)
at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363)
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191)
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343)
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318)
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)

接下來(lái)便只能是分析堆棧了,因?yàn)?Pulsar-client 的部分實(shí)現(xiàn)源碼是沒(méi)有直接打包到依賴中的,反編譯的話許多代碼行數(shù)對(duì)不上,所以需要將官方的源碼拉到本地,切換到對(duì)于的分支進(jìn)行查看。

這一步稍微有點(diǎn)麻煩,首先是代碼庫(kù)還挺大的,加上之前如果沒(méi)有準(zhǔn)備好 Pulsar 的開(kāi)發(fā)環(huán)境的話估計(jì)會(huì)勸退一部分人;但其實(shí)大部分問(wèn)題都是網(wǎng)絡(luò)造成的,只要配置一些 Maven 鏡像多試幾次總會(huì)編譯成功。

我這里直接將分支切換到 branch-2.8。

從堆棧的頂部開(kāi)始排查 TypedMessageBuilderImpl.java:91:

圖片

看起來(lái)是內(nèi)部異步發(fā)送消息的時(shí)候拋了異常。

接著往下看到這里:

java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at

圖片

看起來(lái)是這里沒(méi)錯(cuò),但是代碼行數(shù)明顯不對(duì);因?yàn)?2.8 這個(gè)分支也是修復(fù)過(guò)幾個(gè)版本,所以中間有修改導(dǎo)致代碼行數(shù)與最新代碼對(duì)不上也正常。

semaphore.get().acquire();

不過(guò)初步來(lái)看應(yīng)該是這行代碼拋出的線程終端異常,這里看起來(lái)只有他最有可能了。

圖片

為了確認(rèn)是否是真的是這行代碼,這個(gè)文件再往前翻了幾個(gè)版本最終確認(rèn)了就是這行代碼沒(méi)錯(cuò)了。

我們點(diǎn)開(kāi)java.util.concurrent.Semaphore#acquire()的源碼。

/**
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}

通過(guò)源碼會(huì)發(fā)現(xiàn) acquire()? 函數(shù)確實(shí)會(huì)響應(yīng)中斷,一旦檢測(cè)到當(dāng)前線程被中斷后便會(huì)拋出 InterruptedException 異常。

定位問(wèn)題

所以問(wèn)題的原因基本確定了,就是在 Pulsar 的發(fā)送消息線程被中斷了導(dǎo)致的,但為啥會(huì)被中斷還需要繼續(xù)排查。

我們知道線程中斷是需要調(diào)用 Thread.currentThread().interrupt(); API的,首先猜測(cè)是否 Pulsar 客戶端內(nèi)部有個(gè)線程中斷了這個(gè)發(fā)送線程。

于是我在 pulsar-client 這個(gè)模塊中搜索了相關(guān)代碼:

圖片

排除掉和 producer 不相關(guān)的地方,其余所有中斷線程的代碼都是在有了該異常之后繼續(xù)傳遞而已;所以初步來(lái)看 pulsar-client 內(nèi)部沒(méi)有主動(dòng)中斷的操作。

既然 Pulsar 自己沒(méi)有做,那就只可能是業(yè)務(wù)做的了?

于是我在業(yè)務(wù)代碼中搜索了一下:

圖片

果然在業(yè)務(wù)代碼中搜到了唯一一處中斷的地方,而且通過(guò)調(diào)用關(guān)系得知這段代碼是在消息發(fā)送前執(zhí)行的,并且和 Pulsar 發(fā)送函數(shù)處于同一線程。

大概的偽代碼如下:

List.of(1, 2, 3).stream().map(e -> {
return CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
return e;
});
}
).collect(Collectors.toList()).forEach(f -> {
try {
Integer integer = f.get();
log.info("====" + integer);
if (integer==3){
TimeUnit.SECONDS.sleep(10);
Thread.currentThread().interrupt();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
});
MessageId send = producer.newMessage().value(msg.getBytes()).send();

執(zhí)行這段代碼可以完全復(fù)現(xiàn)同樣的堆棧。

幸好中斷這里還打得有日志:

圖片

通過(guò)日志搜索發(fā)現(xiàn)異常的時(shí)間和這個(gè)中斷的日志時(shí)間點(diǎn)完全重合,這樣也就知道根本原因了。

因?yàn)闃I(yè)務(wù)線程和消息發(fā)送線程是同一個(gè),在某些情況下會(huì)執(zhí)行 Thread.currentThread().interrupt();,其實(shí)單純執(zhí)行這行函數(shù)并不會(huì)發(fā)生什么,只要沒(méi)有去響應(yīng)這個(gè)中斷,也就是 Semaphore 源碼中的判斷了線程中斷的標(biāo)記:

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}

但恰好這里業(yè)務(wù)中斷后自己并沒(méi)有去判斷這個(gè)標(biāo)記,導(dǎo)致 Pulsar 內(nèi)部去判斷了,最終拋出了這個(gè)異常。

總結(jié)

所以歸根結(jié)底還是這里的代碼不合理導(dǎo)致的,首先是自己中斷了線程但也沒(méi)使用,從而導(dǎo)致有被其他基礎(chǔ)庫(kù)使用的可能,所以會(huì)造成了一些不可預(yù)知的后果。

再一個(gè)是不建議在業(yè)務(wù)代碼中使用 Thread.currentThread().interrupt(); 這類代碼,第一眼根本不知道是要干啥,也不易維護(hù)。

其實(shí)本質(zhì)上線程中斷也是線程間通信的一種手段,有這類需求完全可以換為內(nèi)置的 BlockQueue 這類函數(shù)來(lái)實(shí)現(xiàn)。

責(zé)任編輯:姜華 來(lái)源: crossoverJie
相關(guān)推薦

2013-12-05 10:50:13

2024-06-28 08:28:43

反序列化filterJson

2023-12-12 08:08:17

插件PRPulsar

2013-03-18 10:31:22

JS異常

2022-11-28 08:37:23

MQ集群線程棧

2021-04-30 07:09:48

SQLP0事故

2021-10-13 10:22:10

Python多繼承開(kāi)發(fā)

2018-06-24 16:39:28

Tomcat異常線程

2011-04-11 09:53:06

Oracle

2024-10-24 08:21:33

2021-01-06 05:25:56

項(xiàng)目Springboot應(yīng)用

2022-12-18 19:27:09

Pulsar編輯器開(kāi)源

2016-09-26 17:26:20

2020-03-16 17:20:02

異常處理Spring Boot

2014-10-14 15:50:19

UIAndroid

2009-04-01 08:37:19

Windows 7微軟操作系統(tǒng)

2023-01-03 12:30:25

架構(gòu)CPUGPU

2022-04-28 09:05:41

網(wǎng)絡(luò)爬蟲(chóng)Python

2015-06-10 10:56:50

iOS開(kāi)發(fā)技巧

2013-02-22 18:37:50

容錯(cuò)服務(wù)器
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 草草视频在线播放 | 日韩欧美二区 | 天堂一区二区三区四区 | 麻豆国产一区二区三区四区 | 欧美福利在线 | 成人欧美一区二区三区在线观看 | 青青操av | 毛片在线免费 | 精品欧美一区二区在线观看欧美熟 | 一区二区视频在线观看 | 九九免费 | 亚洲精品v | 欧美日韩亚洲成人 | 日韩精品视频在线 | 亚洲精品视频在线播放 | 国产成人免费在线观看 | 欧美在线一区二区三区 | 国产中文字幕在线 | 欧美日韩最新 | 91亚洲精品在线 | 99久久99| 91精品久久久久久久久久 | 91精品久久久久久久久中文字幕 | 久久久久久www | 欧美一区二区在线视频 | 午夜小电影| 久久久久久久久久久久久久久久久久久久 | 碰碰视频 | 欧美日韩在线免费观看 | 日韩一区二区三区四区五区六区 | 极品的亚洲 | 日本人做爰大片免费观看一老师 | 国产精品国产自产拍高清 | 91偷拍精品一区二区三区 | 精品无码久久久久久久动漫 | 久久91精品国产一区二区 | 一区二区三区在线播放 | 中文字幕一区二区三区四区五区 | 97国产在线观看 | 久久久99精品免费观看 | 国产精品久久久久久久久久久新郎 |