Source code for telemetry_anomdet.models.unsupervised.pca

# src/telemetry_anomdet/models/unsupervised/pca.py

"""
PCA based anomaly detection.

This model learns a low dimensional subspace of nominal telemetry features
using Principal Component Analysis (PCA). Anomaly scores are computed as
the reconstruction error when projecting samples into the PCA subspace
and back into the original space.
"""

from __future__ import annotations

from typing import Optional

import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

from ..base import BaseDetector
from ...feature_extraction import features

[docs] class PCAAnomaly(BaseDetector): """ PCA-based anomaly detector. Accepts 3D windowed input (n_windows, window_size, n_features) and flattens internally via features_stat() before fitting PCA. The caller never needs to manage this conversion. Parameters ---------- n_components : int or None, default=None Number of principal components to retain. If None, all components are kept. Choose a value that retains your desired fraction of variance — inspect ``model.explained_variance_ratio_`` after fitting. scale : bool, default=True Apply StandardScaler before PCA. Recommended when telemetry channels differ significantly in scale (e.g. voltage vs. temperature). percentile : float, default = 95.0 Percentile of training reconstruction errors used to set the default anomaly threshold. 95.0 means the top 5% most anomalous training windows are labelled as anomalies. Attributes (set after fit) -------------------------- decision_scores_ : np.ndarray, shape (n_windows,) Reconstruction errors on training data. threshold_ : float Default anomaly cutoff derived from training scores at ``percentile``. labels_ : np.ndarray, shape (n_windows,) Binary anomaly labels on training data. 0 = normal, 1 = anomaly. model : sklearn.decomposition.PCA Fitted PCA instance. scaler : sklearn.preprocessing.StandardScaler or None Fitted scaler when ``scale = True``, otherwise None. """ def __init__(self, n_components: Optional[int] = None, scale: bool = True, percentile: float = 95.0,): super().__init__(percentile = percentile) self.n_components = n_components self.scale = scale # Fit artifacts: set in fit() self.model: Optional[PCA] = None self.scaler: Optional[StandardScaler] = None # ---- helpers ---- def _flatten(self, X: np.ndarray) -> np.ndarray: """ Flatten 3D windowed tensor to 2D feature matrix via features_stat(). Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Returns ------- X2d : np.ndarray, shape (n_windows, n_features * 6) Statistical features per window: mean, std, min, max, median, slope. """ return features.features_stat(X) def _scale_fit(self, X2d: np.ndarray) -> np.ndarray: if self.scale: self.scaler = StandardScaler() return self.scaler.fit_transform(X2d) self.scaler = None return X2d def _scale_transform(self, X2d: np.ndarray) -> np.ndarray: if self.scale: if self.scaler is None: raise RuntimeError( "Scaler is not fitted. Was the model fitted with scale = True?" ) return self.scaler.transform(X2d) return X2d def _reconstruction_error(self, Xs: np.ndarray) -> np.ndarray: """Project into PCA subspace and compute per-window reconstruction error.""" Z = self.model.transform(Xs) X_recon = self.model.inverse_transform(Z) return np.sum((Xs - X_recon) ** 2, axis = 1)
[docs] def fit(self, X: np.ndarray, y: np.ndarray | None = None) -> "PCAAnomaly": """ Fit PCA on nominal telemetry windows. Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Windowed telemetry tensor from windowify(). y : ignored Present for API consistency. Returns ------- self : PCAAnomaly """ X = self._validate_X(X) X2d = self._flatten(X) Xs = self._scale_fit(X2d) self.model = PCA(n_components = self.n_components) self.model.fit(Xs) scores = self._reconstruction_error(Xs) self._set_post_fit(scores) return self
[docs] def decision_function(self, X: np.ndarray) -> np.ndarray: """ Compute reconstruction error for each window. Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Returns ------- scores : np.ndarray, shape (n_windows,) Reconstruction errors. Higher = more anomalous. """ self._require_fit() X = self._validate_X(X) X2d = self._flatten(X) Xs = self._scale_transform(X2d) return self._reconstruction_error(Xs)
def _get_params(self) -> dict: return { "n_components": self.n_components, "scale": self.scale, "percentile": self.percentile, }