Source code for telemetry_anomdet.feature_extraction.features

# telemetry_anomdet/features.py

"""
Feature extraction methods for telemetry analysis.

This module defines functions to compute relevant
features from preprocessed telemetry data.
"""

# Long form telemetry means one observation per row, wide means all variables get their own column, with timestamps as the index

import pandas as pd
import numpy as np

# Canonical column names
_TS, _VAR, _VAL = "timestamp", "variable", "value"

_REQURED_COLS = {_TS, _VAR, _VAL}

[docs] def pivot_wide(df: pd.DataFrame, *, variables = None) -> pd.DataFrame: """ Convert long form telemetry data to wide format. Arguments: df (pd.DataFrame): Long-form data with ['timestamp','variable','value']. variables (list[str], optional): Specific variables to include; all by default. Returns: pd.DataFrame: Wide table (timestamp index, variables as columns). """ if not isinstance(df, pd.DataFrame): raise TypeError("pivot_table() expects a pandas DataFrame") missing = _REQURED_COLS - set(df.columns) if missing: raise ValueError(f"pivot_wide() missing required columns: {missing}") data = df # (Optional): filter to a subset of variables/sensors if variables is not None: data = data[data[_VAR].isin(variables)] if data.empty: return pd.DataFrame(index = pd.Index([], name = _TS)) # Pivot from long to wide wide = ( data.pivot_table( index = _TS, columns = _VAR, values = _VAL, aggfunc = "last", ) .sort_index() ) # Drop channels that are entirely NaN wide = wide.dropna(axis = 1, how = "all") # Deterministic column order if wide.shape[1] > 0: wide = wide.loc[:, sorted(wide.columns)] # Tidy up the labels wide.index.name = _TS wide.columns.name = None return wide
[docs] def windowify(wide_df: pd.DataFrame, *, window_size: int = 50, step: int = 10) -> np.ndarray: """ Slice wide form telemetry data into overlapping windows. Arguments: wide_df (pd.DataFrame): Wide telemetry table. window_size (int): Number of samples per window. step (int): Step size between consecutive windows. Returns: np.ndarray: Array with shape (n_windows, window_size, n_features). """ if not isinstance(wide_df, pd.DataFrame): raise TypeError("windoify() expects a pandas DataFrame") # Convert to numeric matrix: shape (n_samples, n_features) values = wide_df.to_numpy(dtype = float) n_samples, n_features = values.shape windows = [] # Slide a fixed size window across time axis for start in range(0, n_samples - window_size + 1, step): end = start + window_size windows.append(values[start:end, :]) # If not enough data for a single window, return an empty array if not windows: return np.empty((0, window_size, n_features), dtype = float) # Stack list of (window_size, n_features) into (n_windows, window_size, n_features) return np.stack(windows, axis = 0)
[docs] def features_stat(X3d: np.ndarray) -> np.ndarray: """ Generate a 2D feature table from windowed telemetry. Arguments: X3d (np.ndarray): Windowed telemetry tensor with shape (n_windows, window_size, n_features), typically produced by `make_feature_table()`. Returns: np.ndarray: Shape (n_windows, n_features * 4). For each window and each variable, this function computes simple statistical features (mean, standard deviation, min, and max) along the time axis and concatenates them into a flat feature vector ready for downstream modeling. """ X3d = np.asarray(X3d) if X3d.ndim != 3: raise ValueError( f"features_stat() expects a 3D array " f"(n_windows, window_size, n_features), got {X3d.shape}" ) n_windows, window_size, n_features = X3d.shape if n_windows == 0 or n_features == 0: # If no data return an empty 2D array with 0 rows return np.empty((0, 0), dtype = float) # Compute stats along the time axis (axis = 1) means = np.nanmean(X3d, axis = 1) # (n_windows, n_features) stds = np.nanstd(X3d, axis = 1) # (n_windows, n_features) mins = np.nanmin(X3d, axis = 1) # (n_windows, n_features) maxs = np.nanmax(X3d, axis = 1) # (n_windows, n_features) medians = np.nanmedian(X3d, axis = 1) # (n_windows, n_features) # Linear slope per variable for each window times = np.arange(window_size) slopes = np.zeros((n_windows, n_features), dtype = float) for i in range(n_windows): for j in range(n_features): y = X3d[i, :, j] mask = np.isfinite(y) # If all NaN or fewer than 2 finite points, slope stays 0 if mask.sum() < 2: slopes[i, j] = 0.0 else: a, b = np.polyfit(times[mask], y[mask], 1) slopes[i, j] = a # Concatenate into a single feature vector per window: [mean | std | min | max] X_feat = np.concatenate([means, stds, mins, maxs, medians, slopes], axis = 1) # Replace any remaining NaN/inf with column means col_means = np.nanmean(X_feat, axis = 0) col_means = np.where(np.isfinite(col_means), col_means, 0.0) bad = ~np.isfinite(X_feat) if bad.any(): X_feat[bad] = col_means[np.where(bad)[1]] return X_feat
[docs] def make_feature_table(df: pd.DataFrame, *, variables = None, window_size = 50, step = 10) -> np.ndarray: """ Generate a complete feature table from preprocessed telemetry. Arguments: df (pd.DataFrame): Preprocessed long-form telemetry data. variables (list[str], optional): Subset of sensors to include. window_size (int): Window length (default 50 samples). step (int): Step size between windows. Returns: np.ndarray: Shape (n_windows, window_size, n_features). Ready for downstream modeling or for passing into a future features_stat() to get a 2D feature table. """ # Convert long form to wide form wide = pivot_wide(df, variables = variables) # If no usable data remains, stop early if wide.empty: return np.empty((0, window_size, 0)) # Slice the wide data into fixed length windows X3d = windowify(wide, window_size = window_size, step = step) # Return the 3D telemetry tensor ready for feature extraction return X3d