Source code for telemetry_anomdet.models.unsupervised.kmeans

# src/telemetry_anomdet/models/unsupervised/kmeans.py

"""
K-Means clustering anomaly detection.

Each telemetry window is assigned to its nearest cluster centroid.
Anomaly scores are distances to the nearest centroid. Windows far from
any learned nominal operating mode score higher and are flagged as anomalies.
"""

from __future__ import annotations

from typing import Optional

import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler

from ..base import BaseDetector
from ...feature_extraction import features

[docs] class KMeansAnomaly(BaseDetector): """ K-Means clustering based anomaly detector. Accepts 3D windowed input (n_windows, window_size, n_features) and flattens internally via features_stat() before clustering. The caller never needs to manage this conversion. Parameters ---------- n_clusters : int, default = 5 Number of clusters (nominal operating modes) to learn. Each cluster represents a recurring pattern in the telemetry. scale : bool, default = False Apply StandardScaler before clustering. Enable when channels differ significantly in scale so distance calculations are not dominated by high-magnitude channels. percentile : float, default = 95.0 Percentile of training centroid distances used to set the default anomaly threshold. 95.0 means the top 5% most distant training windows are labelled as anomalies. Attributes (set after fit) -------------------------- decision_scores_ : np.ndarray, shape (n_windows,) Distance-to-nearest-centroid scores on training data. threshold_ : float Default anomaly cutoff derived from training scores at ``percentile``. labels_ : np.ndarray, shape (n_windows,) Binary anomaly labels on training data. 0 = normal, 1 = anomaly. model : sklearn.cluster.KMeans Fitted KMeans instance. centroids : np.ndarray, shape (n_clusters, n_features) Learned cluster centers in the (optionally scaled) feature space. scaler : sklearn.preprocessing.StandardScaler or None Fitted scaler when ``scale=True``, otherwise None. """ def __init__(self, n_clusters: int = 5, scale: bool = False, percentile: float = 95.0,): super().__init__(percentile=percentile) self.n_clusters = n_clusters self.scale = scale # Fit artifacts: set in fit() self.model: Optional[KMeans] = None self.centroids: Optional[np.ndarray] = None self.scaler: Optional[StandardScaler] = None # ---- helpers ---- def _flatten(self, X: np.ndarray) -> np.ndarray: """ Flatten 3D windowed tensor to 2D feature matrix via features_stat(). Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Returns ------- X2d : np.ndarray, shape (n_windows, n_features * 6) Statistical features per window: mean, std, min, max, median, slope. """ return features.features_stat(X) def _scale_fit(self, X2d: np.ndarray) -> np.ndarray: if self.scale: self.scaler = StandardScaler() return self.scaler.fit_transform(X2d) self.scaler = None return X2d def _scale_transform(self, X2d: np.ndarray) -> np.ndarray: if self.scale: if self.scaler is None: raise RuntimeError( "Scaler is not fitted. Was the model fitted with scale=True?" ) return self.scaler.transform(X2d) return X2d def _centroid_distances(self, Xs: np.ndarray) -> np.ndarray: """Distance from each sample to its nearest centroid.""" return self.model.transform(Xs).min(axis = 1) # ---- BaseDetector interface ----
[docs] def fit(self, X: np.ndarray, y: np.ndarray | None = None) -> "KMeansAnomaly": """ Fit K-Means on nominal telemetry windows. Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Windowed telemetry tensor from windowify(). y : ignored Present for API consistency. Returns ------- self : KMeansAnomaly Raises ------ ValueError If n_clusters < 1 or n_clusters > n_windows. """ X = self._validate_X(X) X2d = self._flatten(X) n_windows = X2d.shape[0] if self.n_clusters < 1: raise ValueError("n_clusters must be at least 1.") if self.n_clusters > n_windows: raise ValueError( f"n_clusters ({self.n_clusters}) cannot exceed the number of " f"windows ({n_windows}). Reduce n_clusters or increase your data." ) Xs = self._scale_fit(X2d) self.model = KMeans( n_clusters = self.n_clusters, n_init = 10, random_state = 0, ) self.model.fit(Xs) self.centroids = self.model.cluster_centers_ scores = self._centroid_distances(Xs) self._set_post_fit(scores) return self
[docs] def decision_function(self, X: np.ndarray) -> np.ndarray: """ Compute distance-to-nearest-centroid scores for each window. Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Returns ------- scores : np.ndarray, shape (n_windows,) Centroid distances. Higher = more anomalous. """ self._require_fit() X = self._validate_X(X) X2d = self._flatten(X) Xs = self._scale_transform(X2d) return self._centroid_distances(Xs)
# ---- KMeans specific method ----
[docs] def predict_clusters(self, X: np.ndarray) -> np.ndarray: """ Assign each window to its nearest cluster. Unique to KMeansAnomaly. Not part of the BaseDetector interface. Useful for understanding which nominal operating mode each window belongs to, independent of whether it is flagged as an anomaly. Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Returns ------- cluster_labels : np.ndarray of int, shape (n_windows,) Cluster index in [0, n_clusters - 1] for each window. """ self._require_fit() X = self._validate_X(X) X2d = self._flatten(X) Xs = self._scale_transform(X2d) return self.model.predict(Xs)
# ---- Repr support ---- def _get_params(self) -> dict: return { "n_clusters": self.n_clusters, "scale": self.scale, "percentile": self.percentile, }