Source code for telemetry_anomdet.models.base

# src/telemetry_anomdet/models/base.py

"""
BaseDetector: Shared interface for all anomaly detectors in  telemetry_anomdet.

Design
------
All detectors in this library follow PyOD convention.

    fit(x): Train the model on the provided data, sets post-fit attributes.
    predict(x): Return binary labels (0 for normal, 1 for anomaly).
    decision_function(x): Return anomaly scores (higher means more anomalous).
    is_anomaly(x): Boolean mask. Supports runtime thresholding override.

Input Convention
----------------
All detectors accept X of shape (n_windows, window_size, n_features). Classical detecrtors flatten X internally 
using feature_stat(). Sequence detectors (GDN, TranAD) consume X directly. 
The caller never maneges this distinction.

Post-fit Attributes
----------------
After fit(), every detectror exposes:

    decision_scores_: np.ndarray (n_windows,) Training anomaly scores.
    threshold_:       float                   Score cutoff from training.
    labels_:          np.ndarray (n_windows,) Binary labels from training (0 normal, 1 anomaly).
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from pathlib import Path
import pickle

import numpy as np

[docs] class BaseDetector(ABC): """ Abstract base class for all anomaly detectors in telemetry-anomdet. Parameters ---------- percentile : float, default = 95.0 Percentile of training anomaly scores used to set ``threshold_``. For example, 95.0 means the top 5% most anomalous training windows are labelled as anomalies by default. Must be in (0, 100). Attributes (set after fit) -------------------------- decision_scores_ : np.ndarray, shape (n_windows,) Anomaly scores on the training data. Higher values indicate greater anomaly likelihood. Set by ``_set_post_fit()`` at the end of ``fit()``. threshold_ : float Score cutoff derived from ``decision_scores_`` at ``self.percentile``. Used as the default decision boundary in ``predict()`` and ``is_anomaly()``. labels_ : np.ndarray, shape (n_windows,) Binary anomaly labels on the training data. 0 = normal, 1 = anomaly. Derived from ``decision_scores_`` and ``threshold_``. Notes ----- Subclasses must implement ``fit()`` and ``decision_function()``. All other methods are provided and should not need to be overridden unless the detector has non-standard scoring behavior. """ def __init__(self, percentile: float = 95.0): if not (0.0 < percentile < 100.0): raise ValueError( f"percentile must be in (0, 100), got {percentile!r}." ) self.percentile = percentile # Post-fit attributes; None until fit() is called! self.decision_scores_: np.ndarray | None = None self.threshold_: float | None = None self.labels_: np.ndarray | None = None
[docs] @abstractmethod def fit(self, X: np.ndarray, y: np.ndarray | None = None) -> "BaseDetector": """ Train the detector on nominal telemetry windows. Subclasses must call ``self._set_post_fit(scores)`` at the end of this method, where ``scores`` are the training anomaly scores. This sets ``decision_scores_``, ``threshold_``, and ``labels_`` automatically. Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Windowed telemetry tensor from ``windowify()``. y : np.ndarray, optional Ignored for unsupervised detectors. Present for API consistency. Returns ------- self : BaseDetector Fitted detector instance, for method chaining. """ ...
[docs] @abstractmethod def decision_function(self, X: np.ndarray) -> np.ndarray: """ Compute raw anomaly scores for each input window. Higher scores indicate greater anomaly likelihood. This is the core scoring method. All other scoring methods call this one. Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Windowed telemetry tensor from ``windowify()``. Returns ------- scores : np.ndarray, shape (n_windows,) Anomaly score per window. """ ...
# Core methods (implemented by subclass)
[docs] def predict(self, X: np.ndarray): """ Classify each window as normal (0) or anomalous (1). Uses ``threshold_`` from training by default. For runtime threshold adjustment, use ``is_anomaly()`` with a ``threshold`` or ``percentile`` override instead. Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Windowed telemetry tensor from ``windowify()``. Returns ------- labels : np.ndarray, shape (n_windows,) Binary labels. 0 = normal, 1 = anomaly. """ scores = self.decision_function(X) return (scores > self._get_threshold()).astype(int)
[docs] def is_anomaly(self, X: np.ndarray, *, threshold: float | None = None, percentile: float | None = None,) -> np.ndarray: """ Boolean anomaly mask with optional runtime threshold override. This is the human-in-the-loop hook. Operators can adjust sensitivity at inference time without retraining by passing a custom ``threshold`` or ``percentile``. If neither is provided, the training derived ``threshold_`` is used. Priority: ``threshold`` > ``percentile`` > ``threshold_``. Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Windowed telemetry tensor from ``windowify()``. threshold : float, optional Fixed score cutoff. Windows with scores above this are anomalies. percentile : float, optional Compute the cutoff as this percentile of the current batch's scores. Useful when the operator wants to flag only the top N% of a given pass's windows rather than using training statistics. Must be in (0, 100). Returns ------- mask : np.ndarray of bool, shape (n_windows,) True where the window is anomalous. Examples -------- Default — use training threshold: flags = detector.is_anomaly(X3d) Tighten sensitivity (only top 2% flagged): flags = detector.is_anomaly(X3d, percentile=98.0) Fixed cutoff from operator feedback: flags = detector.is_anomaly(X3d, threshold=0.72) """ scores = self.decision_function(X) if threshold is not None: thr = float(threshold) elif percentile is not None: if not (0.0 < percentile < 100.0): raise ValueError( f"percentile must be in (0, 100), got {percentile!r}." ) thr = float(np.percentile(scores, percentile)) else: thr = self._get_threshold() return scores > thr
[docs] def score_samples(self, X: np.ndarray) -> np.ndarray: """ Alias for ``decision_function()``. Provided for compatibility with code that uses the sklearn/PyOD ``score_samples`` convention. Prefer ``decision_function()`` for new code. Parameters ---------- X : np.ndarray, shape (n_windows, window_size, n_features) Returns ------- scores : np.ndarray, shape (n_windows,) """ return self.decision_function(X)
def _set_post_fit(self, scores: np.ndarray) -> None: """ Set the three standard post-fit attributes from training scores. Call this at the end of ``fit()`` in every subclass: scores = self._compute_my_scores(X) self._set_post_fit(scores) return self Parameters ---------- scores : np.ndarray, shape (n_windows,) Anomaly scores on the training data. """ scores = np.asarray(scores, dtype=float) if scores.ndim != 1: raise ValueError( f"_set_post_fit() expects a 1D score array, got shape {scores.shape}." ) self.decision_scores_ = scores self.threshold_ = float(np.percentile(scores, self.percentile)) self.labels_ = (scores > self.threshold_).astype(int) def _get_threshold(self) -> float: """ Return the training derived threshold, raising if not yet fitted. """ if self.threshold_ is None: raise RuntimeError( f"{self.__class__.__name__} is not fitted. Call fit() first." ) return self.threshold_ def _require_fit(self) -> None: """ Raise if the detector has not been fitted. Use at the top of ``decision_function()`` in subclasses: def decision_function(self, X): self._require_fit() X = self._validate_X(X) ... """ if self.threshold_ is None: raise RuntimeError( f"{self.__class__.__name__} is not fitted. Call fit() first." ) def _validate_X(self, X: np.ndarray) -> np.ndarray: """ Validate and coerce input to a clean 3D float array. Enforces the (n_windows, window_size, n_features) input convention. Call at the top of both ``fit()`` and ``decision_function()``. Parameters ---------- X : array like Input to validate. Returns ------- X : np.ndarray, shape (n_windows, window_size, n_features) Validated array, coerced to float64. Raises ------ ValueError If X is not 3D, has 0 windows, or contains non-finite values. """ X = np.asarray(X, dtype = float) if X.ndim != 3: raise ValueError( f"Expected 3D input (n_windows, window_size, n_features), " f"got shape {X.shape}. " f"Pass the direct output of windowify()." ) if X.shape[0] == 0: raise ValueError( "Input X has 0 windows. Check that windowify() received " "enough data for at least one window." ) if X.shape[1] == 0: raise ValueError("Input X has window_size = 0.") if X.shape[2] == 0: raise ValueError("Input X has 0 features (channels).") if not np.isfinite(X).all(): n_bad = (~np.isfinite(X)).sum() raise ValueError( f"Input X contains {n_bad} non-finite value(s) (NaN or Inf). " f"Run interpolate_gaps() and normalize_fit() before windowing." ) return X
[docs] def save(self, path: str | Path) -> None: """ Serialize the fitted detector to disk using pickle. Parameters ---------- path : str or Path Destination file path. Conventionally ends in ``.pkl``. """ path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with path.open("wb") as f: pickle.dump(self, f)
[docs] @classmethod def load(cls, path: str | Path) -> "BaseDetector": """ Deserialize a detector from disk. Parameters ---------- path : str or Path Path to a pickled detector file created by ``save()``. Returns ------- detector : BaseDetector The deserialized, fitted detector instance. Notes ----- Only load files from trusted sources. """ path = Path(path) with path.open("rb") as f: return pickle.load(f)
def __repr__(self) -> str: fitted = self.threshold_ is not None params = self._get_params() param_str = ", ".join(f"{k}={v!r}" for k, v in params.items()) return ( f"{self.__class__.__name__}({param_str}, " f"fitted={fitted})" ) def _get_params(self) -> dict: """ Return constructor parameters for ``__repr__``. Subclasses can override this to include their own hyperparameters. By default returns ``{"percentile": self.percentile}``. """ return {"percentile": self.percentile}