Pulsar升級自動化:一鍵搞定集群升級與測試
背景
由于我在公司內部負責維護 Pulsar,需要時不時的升級 Pulsar 版本從而和社區保持一致。
而每次升級過程都需要做相同的步驟:
- 安裝一個新版本的集群
- 觸發功能性測試
- 觸發性能測試
- 查看監控是否正常
- 應用有無異常日志
- 流量是否正常
- 各個組件的內存占用是否正常
- 寫入延遲是否正常
命令行工具
以上的流程步驟最好是全部一鍵完成,我們只需要人工檢測下監控是否正常即可。
于是我便寫了一個命令行工具,執行流程如下:
pulsar-upgrade-cli -h ok | at 10:33:18
A cli app for upgrading Pulsar
Usage:
pulsar-upgrade-cli [command]
Available Commands:
completion Generate the autocompletion script for the specified shell
help Help about any command
install install a target version
scale scale statefulSet of the cluster
Flags:
--burst-limit int client-side default throttling limit (default 100)
--debug enable verbose output
-h, --help help for pulsar-upgrade-cli
--kube-apiserver string the address and the port for the Kubernetes API server
--kube-as-group stringArray group to impersonate for the operation, this flag can be repeated to specify multiple groups.
--kube-as-user string username to impersonate for the operation
真實使用的 example 如下:
pulsar-upgrade-cli install \
--values ./charts/pulsar/values.yaml \
--set namespace=pulsar-test \
--set initialize=true \
--debug \
--test-case-schema=http \
--test-case-host=127.0.0.1 \
--test-case-port=9999 \
pulsar-test ./charts/pulsar -n pulsar-test
它的安裝命令非常類似于 helm,也是直接使用 helm 的 value.yaml 進行安裝;只是在安裝成功后(等待所有的 Pod 都處于 Running 狀態)會再觸發 test-case 測試,也就是請求一個 endpoint。
這個 endpoint 會在內部處理所有的功能測試和性能測試,具體細節就在后文分析。
同時還提供了一個 scale(擴、縮容) 命令,可以用修改集群規模:
# 縮容集群規模為0
./pulsar-upgrade-cli scale --replicase 0 -n pulsar-test
# 縮容為最小集群
./pulsar-upgrade-cli scale --replicase 1 -n pulsar-test
# 恢復為最滿集群
./pulsar-upgrade-cli scale --replicase 2 -n pulsar-test
這個需求是因為我們的 Pulsar 測試集群部署在了一個 servless 的 kubernetes 集群里,它是按照使用量收費的,所以在我不需要的使用的時候可以通過這個命令將所有的副本數量修改為 0,從而減少使用成本。
當只需要做簡單的功能測試時便回將集群修改為最小集群,將副本數修改為只可以提供服務即可。
而當需要做性能測試時就需要將集群修改為最高配置。
這樣可以避免每次都安裝新集群,同時也可以有效的減少測試成本。
實現原理
require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
helm.sh/helm/v3 v3.10.2
)
這個命令行工具本質上是參考了 helm 的命令行實現的,所有主要也是依賴了 helm 和 cobra。
下面以最主要的安裝命令為例,核心的是以下的步驟:
- 執行 helm 安裝(這里是直接使用的 helm 的源碼邏輯進行安裝)
- 等待所有的 Pod 成功運行
- 觸發 test-case 執行
- 等待測試用例執行完畢
- 檢測是否需要卸載安裝的集群
func (e *installEvent) FinishInstall(cfg *action.Configuration, name string) error {
bar.Increment()
bar.Finish()
clientSet, err := cfg.KubernetesClientSet()
if err != nil {
return err
}
ctx := context.Background()
ip, err := GetServiceExternalIp(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-proxy", name))
if err != nil {
return err
}
token, err := GetPulsarProxyToken(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-token-proxy-admin", name))
if err != nil {
return err
}
// trigger testcase
err = e.client.Trigger(context.Background(), ip, token)
return err
}
這里的 FinishInstall 需要獲取到新安裝的 Pulsar 集群的 proxy IP 地址和鑒權所使用的 token(GetServiceExternalIp()/GetPulsarProxyToken())。
將這兩個參數傳遞給 test-case 才可以構建出 pulsar-client.
這個命令的核心功能就是安裝集群和觸發測試,以及一些集群的基本運維能力。
測試框架
而關于這里的測試用例也有一些小伙伴咨詢過,如何對 Pulsar 進行功能測試。
其實 Pulsar 源碼中已經包含了幾乎所有我們會使用到的測試代碼,理論上只要新版本的官方鏡像已經推送了那就是跑了所有的單測,質量是可以保證的。
那為什么還需要做功能測試呢?
其實很很簡單,Pulsar 這類基礎組件官方都有提供基準測試,但我們想要用于生產環境依然需要自己做壓測得出一份屬于自己環境下的性能測試報告。
根本目的是要看在自己的業務場景下是否可以滿足(包括公司的軟硬件,不同的業務代碼)。
所以這里的功能測試代碼有一個很重要的前提就是:需要使用真實的業務代碼進行測試。
也就是業務在線上使用與 Pulsar 相關的代碼需要參考功能測試里的代碼實現,不然有些問題就無法在測試環節覆蓋到。
這里我就踩過坑,因為在功能測試里用的是官方的 example 代碼進行測試的,自然是沒有問題;但業務在實際使用時,使用到了一個 Schema 的場景,并沒有在功能測試里覆蓋到(官方的測試用例里也沒有??),就導致升級到某個版本后業務功能無法正常使用(雖然用法確實是有問題),但應該在我測試階段就暴露出來。
實現原理
以上是一個集群的功能測試報告,這里我只有 8 個測試場景(結合實際業務使用),考慮到未來可能會有新的測試用例,所以在設計這個測試框架時就得考慮到擴展性。
AbstractJobDefine job5 =
new FailoverConsumerTest(event, "故障轉移消費測試", pulsarClient, 20, admin);
CompletableFuture<Void> c5 = CompletableFuture.runAsync(job5::start, EXECUTOR);
AbstractJobDefine job6 = new SchemaTest(event,"schema測試",pulsarClient,20,prestoService);
CompletableFuture<Void> c6 = CompletableFuture.runAsync(job6::start, EXECUTOR);
AbstractJobDefine job7 = new VlogsTest(event,"vlogs test",pulsarClient,20, vlogsUrl);
CompletableFuture<Void> c7 = CompletableFuture.runAsync(job7::start, EXECUTOR);
CompletableFuture<Void> all = CompletableFuture.allOf(c1, c2, c3, c4, c5, c6, c7);
all.whenComplete((___, __) -> {
event.finishAll();
pulsarClient.closeAsync();
admin.close();
}).get();
對外提供的 trigger 接口就不貼代碼了,重點就是在這里構建測試任務,然后等待他們全部執行完畢。
@Data
public abstract class AbstractJobDefine {
private Event event;
private String jobName;
private PulsarClient pulsarClient;
private int timeout;
private PulsarAdmin admin;
public AbstractJobDefine(Event event, String jobName, PulsarClient pulsarClient, int timeout, PulsarAdmin admin) {
this.event = event;
this.jobName = jobName;
this.pulsarClient = pulsarClient;
this.timeout = timeout;
this.admin = admin;
}
public void start() {
event.addJob();
try {
CompletableFuture.runAsync(() -> {
StopWatch watch = new StopWatch();
try {
watch.start(jobName);
run(pulsarClient, admin);
} catch (Exception e) {
event.oneException(this, e);
} finally {
watch.stop();
event.finishOne(jobName, StrUtil.format("cost: {}s", watch.getTotalTimeSeconds()));
}
}, TestCase.EXECUTOR).get(timeout, TimeUnit.SECONDS);
} catch (Exception e) {
event.oneException(this, e);
}
}
/** run busy code
* @param pulsarClient pulsar client
* @param admin pulsar admin client
* @throws Exception e
*/
public abstract void run(PulsarClient pulsarClient, PulsarAdmin admin) throws Exception;
}
核心代碼就是這個抽象的任務定義類,其中的 start 函數用于定義任務執行的模版:
- 添加任務:具體實現是任務計數器+1
- 開始計時
- 執行抽血的 run 函數,具體實現交給子類
- 異常時記錄事件
- 正常執行完畢后也記錄事件
下面來看一個普通用例的實現情況:
就是重寫了 run() 函數,然后在其中實現具體的測試用例,斷言測試結果。
這樣當我們需要再添加用例的時候只需要再新增一個子類實現即可。
同時還需要定義一個事件接口,用于處理一些關鍵的節點:
public interface Event {
/**
* 新增一個任務
*/
void addJob();
/** 獲取運行中的任務數量
* @return 獲取運行中的任務數量
*/
TestCaseRuntimeResponse getRuntime();
/**
* 單個任務執行完畢
*
* @param jobName 任務名稱
* @param finishCost 任務完成耗時
*/
void finishOne(String jobName, String finishCost);
/**單個任務執行異常
* @param jobDefine 任務
* @param e 異常
*/
void oneException(AbstractJobDefine jobDefine, Exception e);
/**
* 所有任務執行完畢
*/
void finishAll();
}
其中 getRuntime 接口是用于在 cli 那邊查詢任務是否執行完畢的接口,只有任務執行完畢之后才能退出 cli。
監控指標
當這些任務運行完畢后我們需要重點查看應用客戶端和 Pulsar broker 端是否有異常日志。
同時還需要觀察一些關鍵的監控面板:
包含但不限于:
- 消息吞吐量
- broker 寫入延遲
- Bookkeeper 的寫入、讀取成功率,以及延遲。
當然還有 zookeeper 的運行情況也需要監控,限于篇幅就不一一粘貼了。
以上就是測試整個 Pulsar 集群的流程,當然還有一些需要優化的地方。
比如使用命令行還是有些不便,后續可能會切換到網頁上就可以操作。