使用 SQL 的方式查詢消息隊列數據以及踩坑指南
背景
為了讓業務團隊可以更好的跟蹤自己消息的生產和消費狀態,需要一個類似于表格視圖的消息列表,用戶可以直觀的看到發送的消息;同時點擊詳情后也能查到消息的整個軌跡。
消息列表
點擊詳情后查看軌跡
原理介紹
由于 Pulsar 并沒有關系型數據庫中表的概念,所有的數據都是存儲在 Bookkeeper 中,為了模擬使用 SQL 查詢的效果 Pulsar 提供了 Presto (現在已經更名為 Trino)的插件。
Trino 是一個分布式的 SQL 查詢引擎,它也提供了插件能力,如果我們想通過 SQL 從自定義數據源查詢數據時,基于它的 SPI 編寫一個插件是很方便的。
這樣便可以類似于查詢數據庫一樣查詢 Pulsar 數據:
Pulsar 插件的運行流程如上圖所示:
- 啟動的時候通過 Pulsar-Admin 接口獲取一些元數據,比如 Scheme,topic 分區信息等。
- 然后會創建一個只讀的 Bookkeeper 客戶端,用于獲取數據。
- 之后根據 SQL 條件過濾數據即可。
相關代碼:
使用 Pulsar-SQL
使用起來也很簡單,官方提供了兩個命令:
- sql-worker: 會啟動一個 trino 服務端同時運行了 Pulsar 插件.
- sql: 就是一個 SQL 命令行終端。
遇到的問題
自己在本地運行的時候自然是沒問題,可是一旦想在生產運行,同時如果你的 Pulsar 集群是運行再 k8s 環境中時就會碰到一些問題。
無法使用現有 Trino 集群
首先第一個問題是如果生產環境已經有了一個 Trino 集群想要復用的時候就會碰到問題,常規流程是將 Pulsar 的插件復制到 Trino 的 Plugin 目錄,然后重啟 Trino 后就能使用該插件。
當然社區也是支持這么做的:
但是當我將 Pulsar-plugin 復制到 Trino 中運行的時候卻失敗了,整體的流程可以參考這個 issue:https://github.com/apache/pulsar/discussions/20941
簡單來說 Trino 的官方鏡像和 pulsar-plugin 并不能兼容,這個問題直接影響到我們是否可以在生產環境使用它。
但是手動編譯出來的 Trino 服務和插件是兼容的,可以直接運行。
因此我只能在本地編譯出 Trino 服務端和 pulsar-plugin 然后打包成一個鏡像來運行了,當然這樣的壞處就是無法利用到我們現有的 Trino 集群,又得重新部署一個了。
流程也比較麻煩:
- 首先是本地編譯 Pulsar-SQL 模塊
- 將生成物復制到當前目錄
- 執行 make docker 打出 docker 鏡像并上傳到私服
- 再執行 kubectl 將 trino 部署到 k8s 環境中
整個流程做下來加上和社區的溝通,更加確定這個功能應該是很少有人在生產環境使用的,畢竟第一個坑就很麻煩,更別提后續的問題了??。
Presto 插件不支持 AuthToken
第二個問題也是個深坑,當我把 Trino 部署好查詢數據的時候直接拋了一個調用 pulsar-admin 接口連接超時的異常。
結果排查了半天發現原來是 pulsar-plugin 里沒有提供 JWT 的驗證方式,而我們的 Pulsar 集群恰好是打開了 JWT 驗證的。
為此我只能先在本地修復了這個問題,同時也提交了 PR,預計會在下一個大版本合并吧:https://github.com/apache/pulsar/pull/20860。
新創建的 topic 查詢失敗
第二個問題是當查詢一個新創建的 topic 時,客戶端會直接 block,相關的復現流程在這里:https://github.com/apache/pulsar/issues/20910
這個問題還好,不是很致命,是我在本地測試的時候無意間發現的。
本地我已經修復了,后面也提交了一個 PR,目前還在討論中:https://github.com/apache/pulsar/pull/20911
查詢消息會丟失最后一條
這個問題也不是很嚴重,數據量少的時候會發現,就是在指定了消息發送時間的查詢條件時,最后一條消息會被過濾掉,相關 issue 在這里:https://github.com/apache/pulsar/issues/20919。
這個我只是定位到了原因,但不太清楚 為什么要這么做(-1),影響也不是很大,就放在這里擱置了。
Schema 不兼容
最后發現的一個問題是我們線上某些 topic 查詢數據的時候會拋出 Not a record: "string"的異常,但只是部分 topic,也排查了很久,整個源碼中沒有任何一個地方有這個異常。
https://github.com/apache/pulsar/issues/20945
根本原因是生產者生成的 schema 有問題,類型已經是 JSON 了,但是 schema 卻是 string,這樣導致 pulsar-plugin 在反序列化 schema 的時候拋出了異常,由于是 pb 反序列化拋出的異常,所以源碼中都搜索不到。
沒有問題的 topic 使用了正確的 schema
后續我也在本地修復了這個問題,當拋出異常后就將 schema 降級為基本類型進行解析。
不過本質問題還是客戶端使用有誤,如果對 schema 理解不準確的話還是建議使用 byte[] 吧,這樣至少兼容性不會有問題。相關 PR:https://github.com/apache/pulsar/pull/20955。
總結
Pulsar-SQL 是一個非常有用的功能,只是我們使用過程中確實發現了一些問題,大部分都已經修復了;希望對后續使用該功能的朋友有所幫助。