時間序列異常檢測:MSET-SPRT組合方法的原理和Python代碼實現(xiàn)
在異常檢測領(lǐng)域,尤其針對工業(yè)機械、核反應堆和網(wǎng)絡(luò)安全等復雜系統(tǒng),傳統(tǒng)方法往往難以有效處理高維度且相互關(guān)聯(lián)的數(shù)據(jù)流。多元狀態(tài)估計技術(shù)(MSET) 與序貫概率比檢驗(SPRT) 的組合方法在此類場景中展現(xiàn)出顯著優(yōu)勢。
MSET-SPRT是一種結(jié)合機器學習狀態(tài)估計與統(tǒng)計假設(shè)檢驗的混合技術(shù)框架,通過其高精度和穩(wěn)健性,被廣泛應用于關(guān)鍵任務(wù)系統(tǒng)的監(jiān)控與分析。該方法能夠?qū)崟r識別系統(tǒng)行為的微小偏差,為預防性維護和異常事件預警提供可靠依據(jù)。
MSET-SPRT理論基礎(chǔ)
多元狀態(tài)估計技術(shù)(MSET)原理
MSET作為一種非參數(shù)非線性回歸技術(shù),通過歷史觀測數(shù)據(jù)構(gòu)建系統(tǒng)正常狀態(tài)模型。其核心工作機制包括:
建立包含歷史正常系統(tǒng)狀態(tài)的記憶矩陣,作為參考基準;利用學習到的歷史狀態(tài)間關(guān)系計算加權(quán)組合,從而估計當前系統(tǒng)的預期狀態(tài);通過對比觀測值與估計值,計算系統(tǒng)行為偏差,為異常檢測提供基礎(chǔ)指標。
序貫概率比檢驗(SPRT)方法
SPRT是一種基于統(tǒng)計推斷的序貫假設(shè)檢驗方法,專用于確定系統(tǒng)行為偏差是否具有統(tǒng)計顯著性。其主要功能為:
持續(xù)評估殘差誤差(實際觀測值與模型估計值之間的差異),并根據(jù)預設(shè)的統(tǒng)計模型進行假設(shè)檢驗;當檢測到的偏差超過統(tǒng)計置信閾值時,系統(tǒng)能夠及時發(fā)出預警信號,同時控制虛警率在可接受范圍內(nèi)。
MSET-SPRT框架通過上述兩種技術(shù)的協(xié)同作用,為多元數(shù)據(jù)異常檢測提供了準確且高效的解決方案,特別適用于高維度、高相關(guān)性的時間序列數(shù)據(jù)分析。
Python實現(xiàn)MSET-SPRT異常檢測
下面通過一個精簡的示例來演示MSET-SPRT方法在Python中的實現(xiàn)過程。
導入必要的庫
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
生成模擬數(shù)據(jù)集
構(gòu)建一個多元正態(tài)分布數(shù)據(jù)集,用于模擬正常運行狀態(tài)下的系統(tǒng)行為:
# Simulating normal system behavior (3 correlated sensors)
np.random.seed(42)
mean = [50, 75, 100] # Mean values for three sensors
cov = [[10, 5, 2], [5, 15, 3], [2, 3, 20]] # Covariance matrix
# Generate 500 normal operation samples
normal_data = np.random.multivariate_normal(mean, cov, size=500)
實現(xiàn)MSET算法
采用基于加權(quán)最近鄰的方法實現(xiàn)MSET算法,用于估計系統(tǒng)的預期行為:
class MSET:
def __init__(self, memory_matrix):
self.memory_matrix = memory_matrix # Store normal system states
def estimate(self, input_vector):
"""
Estimates the expected state based on historical data.
Uses nearest neighbors to compute weighted estimation.
"""
weights = np.exp(-np.linalg.norm(self.memory_matrix - input_vector, axis=1))
weights /= np.sum(weights)
return np.dot(weights, self.memory_matrix)
初始化MSET模型,將正常運行數(shù)據(jù)作為記憶矩陣:
# Initialize MSET with normal data as memory
mset_model = MSET(memory_matrix=normal_data)
計算殘差
計算實際觀測值與MSET估計值之間的殘差,作為異常檢測的基礎(chǔ):
# filepath: deephub\5\20250327\article.md
# Simulated test data (normal + some anomalies)
test_data = np.vstack([
np.random.multivariate_normal(mean, cov, size=450), # Normal
np.random.multivariate_normal([70, 50, 130], cov, size=50) # Anomalies
])
# Compute estimated values
estimated_data = np.array([mset_model.estimate(x) for x in test_data])
# Compute residuals
residuals = np.linalg.norm(test_data - estimated_data, axis=1)
應用SPRT進行異常檢測
基于似然比檢驗原理實現(xiàn)SPRT算法,用于判定殘差是否表示異常狀態(tài):
# Define thresholds for SPRT
alpha = 0.05 # False positive rate
beta = 0.05 # False negative rate
mu_0, sigma_0 = np.mean(residuals[:450]), np.std(residuals[:450]) # Normal behavior
mu_1 = mu_0 + 3 * sigma_0 # Anomalous mean shift
# SPRT decision function
def sprt_test(residual):
""" Sequential Probability Ratio Test for anomaly detection """
likelihood_ratio = stats.norm(mu_1, sigma_0).pdf(residual) / stats.norm(mu_0, sigma_0).pdf(residual)
return likelihood_ratio > (1 - beta) / alpha
# Apply SPRT
anomalies = np.array([sprt_test(res) for res in residuals])
# Plot results
plt.figure(figsize=(12, 5))
plt.plot(residuals, label="Residuals", color="blue")
plt.axhline(mu_1, color="red", linestyle="dashed", label="Anomaly Threshold")
plt.scatter(np.where(anomalies)[0], residuals[anomalies], color="red", label="Detected Anomalies", zorder=2)
plt.xlabel("Time")
plt.ylabel("Residual Magnitude")
plt.legend()
plt.title("MSET-SPRT Anomaly Detection")
plt.show()
結(jié)果分析與解釋
圖中數(shù)據(jù)可視化結(jié)果展示了MSET-SPRT方法的異常檢測效果:
藍色曲線表示系統(tǒng)狀態(tài)殘差時間序列,反映了實際觀測值與估計值之間的偏差大小;紅色虛線標示出異常檢測閾值,該閾值基于正常運行數(shù)據(jù)的統(tǒng)計特性計算得出;紅色標記點則代表被SPRT算法判定為異常的時間點,這些點的殘差值顯著高于正常波動范圍。
分析結(jié)果表明,MSET-SPRT方法能夠有效區(qū)分正常系統(tǒng)波動與異常行為,提供了一種可靠的多元時間序列異常檢測方案。該方法特別適用于需要高精度異常檢測的工業(yè)監(jiān)控、設(shè)備健康管理和網(wǎng)絡(luò)安全等領(lǐng)域。