從 Pulsar Client 的原理到它的監(jiān)控面板
背景
前段時(shí)間業(yè)務(wù)團(tuán)隊(duì)偶爾會(huì)碰到一些 Pulsar 使用的問題,比如消息阻塞不消費(fèi)了、生產(chǎn)者消息發(fā)送緩慢等各種問題。
雖然我們有個(gè)監(jiān)控頁面可以根據(jù) topic 維度查看他的發(fā)送狀態(tài),比如速率、流量、消費(fèi)狀態(tài)等信息。
但也有幾個(gè)問題:
- 無法在應(yīng)用維度查看他所依賴的所有 topic 的各種狀態(tài)。
- 監(jiān)控的信息還不夠,比如發(fā)送/消費(fèi)延遲、發(fā)送/消費(fèi)失敗等數(shù)據(jù)。
總之就是缺少一個(gè)全局的監(jiān)控視角,通過這些指標(biāo)可以很方便的分析出當(dāng)時(shí)的運(yùn)行情況。
基于這個(gè)需求經(jīng)過一段時(shí)間的折騰,現(xiàn)在已經(jīng)上線使用幾個(gè)月,目前比較穩(wěn)定,效果圖如下:
現(xiàn)在就可以在每個(gè)應(yīng)用的監(jiān)控面板里看到自己使用了哪些 topic,分別的生產(chǎn)消費(fèi)情況如何。
核心流程
要實(shí)現(xiàn)這些功能就得在應(yīng)用的 metrics 中加入相關(guān)的監(jiān)控信息,但官方的 Java client 是沒有暴露出這些指標(biāo)的。
但 pulsar-client-go 是自帶了這些指標(biāo)的
由于 SDK 不支持所以只能自己想辦法實(shí)現(xiàn)了,為此其實(shí)有兩種實(shí)現(xiàn)方案:
- 魔改 Java client,在需要監(jiān)控的地方手動(dòng)埋點(diǎn)指標(biāo)。
- 由于我們使用了 SkyWalking,所以可以編寫插件,以 agent 的方式獲取數(shù)據(jù)、埋點(diǎn)指標(biāo)。
不過第一種方案有以下一些問題:
- 需要自己維護(hù)一個(gè)代碼分支,還需要定期和官方保持一致,難免會(huì)出現(xiàn)代碼沖突。
- 需要推動(dòng)業(yè)務(wù)方進(jìn)行依賴升級(jí),線上有著幾百個(gè)應(yīng)用,推動(dòng)起來時(shí)間太慢。
第二種方案的好處就不言而喻了:
- 升級(jí)無感知,只需要在我們的基礎(chǔ)鏡像中加上插件即可。
- Java client 的版本也更容易統(tǒng)一。
Client 原理
但其實(shí)不管是哪種方案我們都得熟悉 Java Client 的實(shí)現(xiàn)原理,才能知道哪些數(shù)據(jù)是我們需要重點(diǎn)關(guān)注的,可以幫助我們更好的定位問題。
本文重點(diǎn)不在于此,具體代碼就不仔細(xì)分析了。
從上圖可以看出,如果我們想要監(jiān)控消費(fèi)是否存在阻塞的情況,這幾個(gè)內(nèi)部隊(duì)列是需要重點(diǎn)監(jiān)控的,一旦他們出現(xiàn)堆積,那就會(huì)出現(xiàn)消費(fèi)阻塞。
其實(shí)這些數(shù)據(jù)都可以通過。
org.apache.pulsar.client.api.ProducerStats
org.apache.pulsar.client.api.ConsumerStats
這兩個(gè)接口獲取到生產(chǎn)者和消費(fèi)者的大部分指標(biāo),只是這里還有一個(gè)小插曲。
那就是在獲取消費(fèi)者隊(duì)列大小的時(shí)候,獲取到的數(shù)據(jù)一直為空。
最終經(jīng)過源碼排查,原來是我們大量使用的 messageListener 在獲取隊(duì)列大小時(shí)有 bug,導(dǎo)致獲取到的數(shù)據(jù)一直都為 0.
相關(guān)的 issue 和 PR 可以在這兩個(gè)鏈接查看,問題原因和修復(fù)過程都有具體描述:https://github.com/apache/pulsar/issues/20076 https://github.com/apache/pulsar/pull/20245
但這個(gè)修復(fù)得在新版本才能使用,就導(dǎo)致我們現(xiàn)在的監(jiān)控頁面一直顯示為空。
開發(fā) SkyWalking 插件
然后就是開發(fā)一個(gè) SkyWalking 的插件了,其實(shí)直接使用 SW 開發(fā)插件是上手 Java-Agent 比較快的方式。
SW 的 SDK 封裝了許多 agent 原生接口,使得開發(fā)起來非常容易;當(dāng)然缺點(diǎn)也有,就是得集成整個(gè) SW 的 agent。
這里我簡單介紹下這個(gè)插件的運(yùn)行流程:
- 在創(chuàng)建和刪除 consumer 的時(shí)候維護(hù) consumerPool
- 啟動(dòng)一個(gè)定時(shí)任務(wù),定期從這些 consumer 中獲取指標(biāo)數(shù)據(jù)。
當(dāng)消費(fèi)多分區(qū) topic 時(shí),為了能唯一標(biāo)志一個(gè) consumer,所以給每個(gè)消費(fèi)者都加了一個(gè) hashcode 的 label。
因?yàn)槲覀兯械?Java 技術(shù)棧都是使用的 Prometheus 的包來生成 metrics ,所以該插件也是使用該包生成的數(shù)據(jù)。
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.12.0</version>
<scope>provided</scope>
</dependency>
為了兼容一些特殊 Java 應(yīng)用沒有該包時(shí)會(huì)啟動(dòng)報(bào)錯(cuò),所以在初始化插件的時(shí)候需要檢測(cè)當(dāng)前 classpath
下是否存在該依賴。
這些功能 SW 已經(jīng)封裝好了,對(duì)我們來說也是開箱即用。
其實(shí) SW 插件自己也是支持 metrics 的,由于我們只是使用了它的 trace 功能,所以這里就沒有使用它的 API。
關(guān)于開發(fā)一個(gè) SW 插件的流程也比較簡單,可以參考官方文檔或者是一些現(xiàn)成的插件源碼。https://skywalking.apache.org/docs/skywalking-java/next/en/setup/service-agent/java-agent/java-plugin-development-guide/
總結(jié)
有了這個(gè)監(jiān)控面板后,對(duì)于 Pulsar 客戶端內(nèi)部的一些運(yùn)行情況就不再是黑盒了,還可以基于此做一些報(bào)警,比如消費(fèi)堆積、發(fā)送延遲過大等。
當(dāng)然僅僅只有這個(gè)面板依然是不夠的,后續(xù)我們又開發(fā)了可以通過 messageId
查詢它的整個(gè)生命周期,包括:
- 生產(chǎn)者、消費(fèi)者信息
- 消息生產(chǎn)時(shí)間
- 推送時(shí)間
- ack 時(shí)間等
同時(shí)借助與 Pulsar-SQL 的能力,還能以列表的形式展示當(dāng)前 topic 的消息列表。
當(dāng)然在實(shí)現(xiàn)這兩個(gè)功能的同時(shí)也踩了不少坑,提了幾個(gè) PR ,后面在抽時(shí)間做具體的分享。