基于Prometheus的自動化巡檢
前言
目前,大部分公司都采用Prometheus + Grafana
這一套來做指標監(jiān)控,所以在Prometheus
中也有大量的指標數據。為了滿足日常工作中的巡檢,可以基于Prometheus
實現自動巡檢,減輕部分運維壓力。
思路
為了靈活管理巡檢任務,將整個巡檢功能進行了拆分管理,分為:
數據源管理
:可以管理多個Prometheus
數據源,后期也可以增加其他數據源,比如ES
等。巡檢項管理
:目前的巡檢項
就是各種Prometheus規(guī)則
,之所以要單獨進行管理,是為了在多數據源、多集群等情況下進行復用。標簽管理
:目前是Prometheus
的label
,也是為了方便復用巡檢項
,巡檢項
和標簽
可以靈活進行組合。任務編排
:編排各種巡檢任務。執(zhí)行作業(yè)
:配置定時的巡檢作業(yè),它由多個編排的任務組成。巡檢報告
:便于查看、導出巡檢結果。巡檢通知
:巡檢結果可以通知到企業(yè)微信
群,便于業(yè)務方快速知道目前整個系統有沒有問題。
圖片
效果
數據源管理
(1)添加數據源
(2)數據源列表
巡檢項管理
(1)添加巡檢項
(2)巡檢項列表
標簽管理
(1)添加標簽
(2)標簽列表
任務編排
(1)創(chuàng)建任務編排
(2)任務列表
執(zhí)行作業(yè)
(1)創(chuàng)建執(zhí)行作業(yè)
(2)作業(yè)列表
巡檢報告
每次巡檢完成都會生成對應的巡檢報告。
點擊詳情可以看到巡檢的具體結果。
點擊導出,即可將報告導出為PDF。
如果配置了巡檢通知,則會將對應的巡檢結果發(fā)送到企業(yè)微信群。
代碼實現
大部分的代碼都是普通的CRUD
,比如數據源的管理、巡檢項的管理都是基礎的CRUD
,沒有什么好說的。
這里簡單說一下具體巡檢的實現。
(1)當用戶創(chuàng)建了執(zhí)行作業(yè)
且該作業(yè)處于開啟
狀態(tài),就會創(chuàng)建一個定時任務。
// CreateCronTask 創(chuàng)建定時任務
func (inspectionExecutionJobService *InspectionExecutionJobService) CreateCronTask(job *AutoInspection.InspectionExecutionJob) error {
cronName := fmt.Sprintf("InspectionExecution_%d", job.ID)
taskName := fmt.Sprintf("InspectionExecution_%d", job.ID)
// 檢查是否已存在相同的定時任務
if _, found := global.GVA_Timer.FindTask(cronName, taskName); found {
// 如果已存在,先清除舊的定時任務
global.GVA_Timer.Clear(cronName)
}
// 創(chuàng)建定時任務
var option []cron.Option
option = append(option, cron.WithSeconds())
// 添加定時任務
if _, err := global.GVA_Timer.AddTaskByFunc(cronName, job.CronExpr, func() {
// 執(zhí)行巡檢任務
inspectionExecutionJobService.ExecuteInspectionJob(job)
}, taskName, option...); err != nil {
global.GVA_LOG.Error("創(chuàng)建定時任務失敗", zap.Error(err), zap.Uint("jobID", job.ID))
return err
}
// 更新下次執(zhí)行時間
nextTime := inspectionExecutionJobService.calculateNextRunTime(job.CronExpr)
job.NextRunTime = &nextTime
// 更新數據庫中的記錄
return global.GVA_DB.Model(job).Updates(map[string]interface{}{
"next_run_time": job.NextRunTime,
}).Error
}
Tips:因為是采用的
gin-vue-admin
框架,所以直接使用框架自帶的timer定時器。
(2)當執(zhí)行時間到了,就會執(zhí)行ExecuteInspectionJob
巡檢任務。
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionJob(job *AutoInspection.InspectionExecutionJob) {
// 更新作業(yè)執(zhí)行時間
inspectionExecutionJobService.updateJobExecutionTime(job)
// 創(chuàng)建執(zhí)行記錄
jobExecution := inspectionExecutionJobService.createJobExecution(job)
if jobExecution == nil {
return
}
// 執(zhí)行所有關聯的巡檢任務并收集結果
allResults := inspectionExecutionJobService.executeAllInspectionTasks(job, jobExecution)
global.GVA_LOG.Info("執(zhí)行完成", zap.Any("results", allResults))
// 更新執(zhí)行記錄狀態(tài)和結果
inspectionExecutionJobService.updateJobExecutionResult(jobExecution, allResults)
// 發(fā)送通知
if *job.IsNotice {
inspectionExecutionJobService.sendInspectionNotification(job, jobExecution, allResults)
}
}
這里主要是executeAllInspectionTasks
來執(zhí)行巡檢任務。
// executeAllInspectionTasks 執(zhí)行所有關聯的巡檢任務并收集結果
func (inspectionExecutionJobService *InspectionExecutionJobService) executeAllInspectionTasks(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution) []*result.ProductsResult {
// 創(chuàng)建一個等待組來同步所有巡檢任務
var wg sync.WaitGroup
// 創(chuàng)建一個互斥鎖來保護結果集
var mu sync.Mutex
// 創(chuàng)建一個結果集合
allResults := make([]*result.ProductsResult, 0)
// 執(zhí)行所有關聯的巡檢任務
for _, jobID := range job.JobIds {
wg.Add(1)
gofunc(id uint) {
defer wg.Done()
// 執(zhí)行單個巡檢任務并獲取結果
result := inspectionExecutionJobService.executeSingleInspectionTask(id, jobExecution)
if result != nil {
// 將結果添加到總結果集中
mu.Lock()
allResults = append(allResults, result)
mu.Unlock()
}
}(jobID)
}
// 等待所有巡檢任務完成
wg.Wait()
return allResults
}
它會把作業(yè)
中的任務拆成單個任務,然后由executeSingleInspectionTask
分別執(zhí)行并收集執(zhí)行結果。
// executeSingleInspectionTask 執(zhí)行單個巡檢任務
func (inspectionExecutionJobService *InspectionExecutionJobService) executeSingleInspectionTask(jobID uint, jobExecution *AutoInspection.JobExecution) *result.ProductsResult {
global.GVA_LOG.Info("執(zhí)行巡檢任務", zap.Uint("jobID", jobID))
// 獲取巡檢任務信息
inspectionJob, _ := inspectionJobService.GetInspectionJob(fmt.Sprintf("%d", jobID))
// 創(chuàng)建結果通道
resultCh := make(chan *result.ProductsResult)
// 創(chuàng)建一個用于等待結果的WaitGroup
var resultWg sync.WaitGroup
resultWg.Add(1)
// 用于存儲結果的變量
var taskResult *result.ProductsResult
// 啟動一個goroutine來接收結果
gofunc() {
defer resultWg.Done()
result := <-resultCh
global.GVA_LOG.Info("巡檢任務執(zhí)行完成",
zap.String("jobName", inspectionJob.Name),
zap.Any("result", result))
// 保存結果
taskResult = result
}()
// 執(zhí)行巡檢任務
inspectionExecutionJobService.ExecuteInspectionTask(&inspectionJob, jobExecution, resultCh)
// 等待結果接收完成
resultWg.Wait()
return taskResult
}
在ExecuteInspectionTask
中是為了方便擴展數據源。
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {
switch inspectionJob.DataSourceType {
case "prometheus":
// 執(zhí)行Prometheus巡檢任務
inspectionExecutionJobService.ExecutePrometheusInspectionTask(inspectionJob, jobExecution, resultCh)
}
}
由于目前只有Prometheus
數據源,所以將直接執(zhí)行ExecutePrometheusInspectionTask
。在這個方法中主要是構造Prometheus
規(guī)則然后進行巡檢。
// ExecutePrometheusInspectionTask 執(zhí)行Prometheus巡檢任務
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecutePrometheusInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {
// 執(zhí)行Prometheus巡檢任務的邏輯
var inspectionItemsService InspectionItemsService
var inspectionTagService InspectionTagService
var dataSourceService DataSourceService
// 獲取數據源信息
dataSource, _ := dataSourceService.GetDataSource(fmt.Sprintf("%d", inspectionJob.DataSourceId))
// 創(chuàng)建規(guī)則列表
prometheusRules := make([]*product.PrometheusRule, 0, len(inspectionJob.ItemLabelMaps))
// 遍歷巡檢項與標簽映射關系
for _, itemLabelMap := range inspectionJob.ItemLabelMaps {
// 獲取巡檢項信息
inspectionItem, _ := inspectionItemsService.GetInspectionItems(fmt.Sprintf("%d", itemLabelMap.ItemId))
// 獲取標簽信息
var inspectionTag AutoInspection.InspectionTag
if itemLabelMap.LabelId != 0 {
inspectionTag, _ = inspectionTagService.GetInspectionTag(fmt.Sprintf("%d", itemLabelMap.LabelId))
}
// 創(chuàng)建Prometheus規(guī)則
prometheusRule := &product.PrometheusRule{
Name: inspectionItem.Name,
Rule: inspectionItem.Rule,
LabelFilter: inspectionTag.Label,
Desc: inspectionItem.Description,
AlertInfo: inspectionItem.OutputTemplate,
DataSourceName: dataSource.Name,
}
// 添加到規(guī)則列表
prometheusRules = append(prometheusRules, prometheusRule)
}
// 創(chuàng)建規(guī)則集合
rules := product.Rules{
Prometheus: prometheusRules,
AliyunSafe: []*product.AliyunSafeRule{}, // 空列表,因為這里只處理Prometheus規(guī)則
}
// 創(chuàng)建產品
prod := &product.Product{
Name: inspectionJob.Name,
Rules: rules,
}
// 使用defer和recover捕獲可能的panic
deferfunc() {
if r := recover(); r != nil {
// 記錄panic信息
global.GVA_LOG.Error("執(zhí)行巡檢任務發(fā)生panic",
zap.Any("panic", r),
zap.String("jobName", inspectionJob.Name))
// 創(chuàng)建一個表示失敗的結果并發(fā)送到結果通道
pr := &result.ProductsResult{ProductName: inspectionJob.Name}
// 為每個規(guī)則創(chuàng)建失敗結果
for _, rule := range prometheusRules {
errorMsg := fmt.Sprintf("巡檢執(zhí)行失敗: %v", r)
failureResult := result.NewRuleResult(
result.WithInspectionInfo(rule.Name),
result.WithInspectionResult(result.ABNORMAL),
result.WithInspectionErrorInfo(
[]map[string]string{{
"error": errorMsg,
"rule": rule.Rule,
}},
"執(zhí)行規(guī)則 {{rule}} 時發(fā)生錯誤: {{error}}",
),
)
pr.Add(failureResult)
}
// 發(fā)送結果
resultCh <- pr
}
}()
// 執(zhí)行巡檢
err = prod.Run(resultCh)
if err != nil {
global.GVA_LOG.Error("執(zhí)行巡檢任務失敗", zap.Error(err), zap.String("jobName", inspectionJob.Name))
return
}
global.GVA_LOG.Info("巡檢任務已啟動", zap.String("jobName", inspectionJob.Name))
}
在prod.Run
中,會去做真正的指標數據查詢。
func (p *Product) Run(resultCh chan *result.ProductsResult) error {
global.GVA_LOG.Info(fmt.Sprintf("開始巡檢, %s", p.Name))
pr := &result.ProductsResult{ProductName: p.Name}
// prometheus巡檢規(guī)則
for _, prometheusRule := range p.Rules.Prometheus {
ruleInspectRes, err := prometheusRule.Run()
if err != nil {
return err
}
pr.Add(ruleInspectRes)
}
resultCh <- pr
returnnil
}
然后調用prometheusRule.Run
獲取結果。
func (r *PrometheusRule) Run() (*result.RuleResult, error) {
ds, err := datasource.GetByName(r.DataSourceName)
if err != nil {
returnnil, err
}
pds, ok := ds.(*datasource.PrometheusDataSource)
if !ok {
returnnil, fmt.Errorf("數據源類型錯誤: %s 不是Prometheus數據源", r.DataSourceName)
}
if pds.Client == nil {
returnnil, fmt.Errorf("數據源為空: %s", r.DataSourceName)
}
res, err := pds.Run(r.Rule, r.LabelFilter)
if err != nil {
returnnil, err
}
ruleRes := r.buildRuleResult(res)
return ruleRes, nil
}
func (r *PrometheusRule) buildRuleResult(resultLabels []map[string]string) *result.RuleResult {
iflen(resultLabels) == 0 {
return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
result.WithInspectionResult(result.NORMAL))
}
return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
result.WithInspectionResult(result.ABNORMAL),
result.WithInspectionErrorInfo(resultLabels, r.AlertInfo))
}
具體的查詢是封裝在pds.Run
中的,它會去調用Prometheus
的接口去查詢數據。
func Query(client api.Client, rule string) (model.Value, []string, error) {
// 添加空指針檢查
if client == nil {
returnnil, nil, errors.New("Prometheus client is nil")
}
v1Api := promV1.NewAPI(client)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
value, warnings, err := v1Api.Query(ctx, rule, time.Now(), promV1.WithTimeout(10*time.Second))
global.GVA_LOG.Debug("查詢結果", zap.String("value", value.String()), zap.Any("warnings", warnings))
if err != nil {
returnnil, nil, errors.WithStack(err)
}
return value, warnings, nil
}
(3)如果需要發(fā)送到企業(yè)微信,就會構建發(fā)送結果進行發(fā)送。
func (inspectionExecutionJobService *InspectionExecutionJobService) sendInspectionNotification(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution, results []*result.ProductsResult) {
// 獲取通知配置
var notifyService = NotifyService{}
notify, err := notifyService.GetNotify(fmt.Sprintf("%d", job.NoticdId))
if err != nil {
global.GVA_LOG.Error("獲取通知配置失敗", zap.Error(err))
return
}
// 構建通知內容
// 1. 巡檢摘要
taskCount := len(results) // 巡檢任務數量
itemCount := 0 // 巡檢項數量
normalCount := 0 // 正常項數量
abnormalCount := 0 // 異常項數量
abnormalItems := []string{} // 異常項列表
// 統計巡檢項、正常項和異常項的數量
for _, task := range results {
itemCount += len(task.SubRuleResults)
for _, item := range task.SubRuleResults {
if item.InspectionResult == result.NORMAL {
normalCount++
} elseif item.InspectionResult == result.ABNORMAL {
abnormalCount++
// 收集異常項信息
abnormalDetail := fmt.Sprintf("【%s】%s", task.ProductName, item.InspectionInfo)
iflen(item.InspectionErrorInfo) > 0 {
abnormalDetail += "\n" + strings.Join(item.InspectionErrorInfo, "\n")
}
abnormalItems = append(abnormalItems, abnormalDetail)
}
}
}
// 格式化摘要信息
summary := fmt.Sprintf("巡檢任務%d個,巡檢項%d個,正常%d個,異常%d個", taskCount, itemCount, normalCount, abnormalCount)
// 構建企業(yè)微信通知內容
var content string
if notify.TemplateType == "markdown" {
// Markdown格式
content = fmt.Sprintf(`{
"msgtype": "markdown",
"markdown": {
"content": "# 自動化巡檢結果通知\n\n> ### 執(zhí)行作業(yè):%s\n> ### 執(zhí)行時間:%s\n> ### 執(zhí)行結果:%s\n\n### **異常項列表:**\n%s"
}
}`,
jobExecution.ExecutionJobName,
jobExecution.EndTime.Format("2006-01-02 15:04:05"),
summary,
formatAbnormalItems(abnormalItems))
} else {
// 文本格式
content = fmt.Sprintf(`{
"msgtype": "text",
"text": {
"content": "巡檢結果通知\n執(zhí)行作業(yè):%s\n執(zhí)行時間:%s\n執(zhí)行結果:%s\n\n異常項列表:\n%s"
}
}`,
jobExecution.ExecutionJobName,
jobExecution.EndTime.Format("2006-01-02 15:04:05"),
summary,
formatAbnormalItemsText(abnormalItems))
}
// 發(fā)送通知
ctx := context.Background()
sendParams := sender.SendParams{
NoticeType: notify.Type,
NoticeId: fmt.Sprintf("%d", notify.ID),
NoticeName: notify.Name,
Hook: notify.Address,
Content: content,
}
err = sender.Sender(&ctx, sendParams)
if err != nil {
global.GVA_LOG.Error("發(fā)送巡檢通知失敗", zap.Error(err))
return
}
global.GVA_LOG.Info("發(fā)送巡檢通知成功",
zap.String("jobName", jobExecution.ExecutionJobName),
zap.String("summary", summary))
}
(4)PDF導出是用wkhtmltopdf
實現,該包依賴服務器上的wkhtmltopdf
命令。
func (jobExecutionService *JobExecutionService) GeneratePDF(jobExecution *AutoInspection.JobExecution) (string, error) {
pdf, err := wkhtmltopdf.NewPDFGenerator()
if err != nil {
global.GVA_LOG.Error("PDF生成器初始化失敗", zap.Error(err))
return"", err
}
// 設置全局選項
pdf.Dpi.Set(300)
pdf.Orientation.Set(wkhtmltopdf.OrientationPortrait)
pdf.PageSize.Set(wkhtmltopdf.PageSizeA4)
pdf.MarginTop.Set(20)
pdf.MarginBottom.Set(20)
pdf.MarginLeft.Set(20)
pdf.MarginRight.Set(20)
// 渲染HTML模板
htmlContent, err := jobExecutionService.renderHTMLTemplate(jobExecution)
if err != nil {
global.GVA_LOG.Error("HTML模板渲染失敗", zap.Error(err))
return"", err
}
// 創(chuàng)建一個頁面并添加到生成器
page := wkhtmltopdf.NewPageReader(bytes.NewBufferString(htmlContent))
pdf.AddPage(page)
// 生成PDF
err = pdf.Create()
if err != nil {
return"", err
}
basePath := "uploads/pdf"
// 創(chuàng)建目錄(如果不存在)
if err = os.MkdirAll(basePath, 0755); err != nil {
global.GVA_LOG.Error("創(chuàng)建PDF保存目錄失敗", zap.Error(err))
return"", err
}
filename := generatePDFFileName(jobExecution)
filePath := filepath.Join(basePath, filename)
// 3. 保存PDF到文件
if err = os.WriteFile(filePath, pdf.Bytes(), 0644); err != nil {
global.GVA_LOG.Error("保存PDF文件失敗", zap.Error(err))
return"", err
}
....
return downloadURL, nil
}
以上就是實現巡檢的主要代碼。
最后
大部分企業(yè)雖然都有監(jiān)控告警,但是自動化巡檢在日常的運維工作中還是必要的,它可以聚合目前系統、集群存在的問題,避免遺漏告警信息。另外,在AI發(fā)展迅猛的今天,可以把AI也結合到自動化巡檢中,比如在巡檢中增加一些AI預測,AI故障診斷、AI根因分析等功能。