# telemetry_anomdet/preprocessing.py
"""
Preprocessing utilities for telemetry data.
This module handles data cleaning, normalization,
and other transformations before feature extraction.
"""
# Long form telemetry means one observation per row, wide means all variables get their own column, with timestamps as the index
from typing import Optional
import pandas as pd
import fnmatch
# Canonical column names used throughout the preprocessing pipeline
_TS, _VAR, _VAL = "timestamp", "variable", "value"
[docs]
def clean(df: pd.DataFrame, *, physical_bounds = None) -> pd.DataFrame:
"""
Remove non existant values, non numeric readings, and physically impossible sensor values.
Arguments:
df (pd.DataFrame): Long form telemetry data with columns ['timestamp', 'variable', 'value'].
physical_bounds (dict, optional): Mapping of variable names or patterns to (min, max) valid ranges.
Example: {'Battery_Voltage': (0, 20), 'Battery_Temp': (-40, 85)}.
Returns:
pd.DataFrame: Cleaned dataset.
"""
# Work on a copy
df = df.copy()
# Drop rows with missing core fields
df = df.dropna(subset = [_TS, _VAR, _VAL])
# Apply physical bounds
if physical_bounds:
# Mask that marks all rows as valid
mask = pd.Series(True, index = df.index)
# Loop over each variable pattern and its allowed (min, max) range
for pattern, bounds in physical_bounds.items():
if bounds is None:
# Skip None bounds
continue
# Bounds must be a 2 element tuple/list (min/max)
if not (isinstance(bounds, (tuple, list)) and len(bounds) == 2):
raise ValueError("Must be (min, max)")
lower, upper = bounds
# Select rows whose variable name matches the given pattern
sel = df[_VAR].map(lambda v: fnmatch.fnmatch(v, pattern))
# Mask out rows that fall below or above the valid range
if lower is not None:
mask &= ~(sel & (df[_VAL] < float(lower)))
if upper is not None:
mask &= ~(sel & (df[_VAL] > float(upper)))
# Apply the mask incrementally
df = df[mask]
# Deterministic ordering (by timestamp and variable)
df = df.sort_values([_TS, _VAR]).reset_index(drop = True)
return df
[docs]
def dedupe(df: pd.DataFrame) -> pd.DataFrame:
"""
Remove duplicate or retransmitted rows.
Arguments:
df (pd.DataFrame): Long form telemetry data with potential duplicates.
Returns:
pd.DataFrame: DataFrame with duplicates (timestamp, variable) removed.
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("dedupe() expects a pandas DataFrame")
df = df.copy()
# Sort so duplicates are grouped together
df = df.sort_values([_TS, _VAR, _VAL])
# Drop exact duplicates
df = df.drop_duplicates()
# Drop retransmits (same timestamps, keep last occurence)
df = df.drop_duplicates(subset = [_TS, _VAR], keep = "last")
# Reset index for cleanliness before returning
return df.reset_index(drop = True)
[docs]
def integrity_check(df: pd.DataFrame, *, require_utc: bool = True, require_sorted: bool = True) -> None:
"""
Verify timestamp format, timezone, and column consistency.
Arguments:
df (pd.DataFrame): Long form telemetry data.
require_utc (bool): If True, ensure timestamps are UTC.
require_sorted (bool): If True, ensure timestamps are sorted ascending.
Raises:
ValueError: If schema or ordering fails validation.
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("integrity_check() expects a pandas DataFrame")
# Extract timestamp column
ts = df[_TS]
# Ensure datetime (datetime64), if not try to coerce
if not pd.api.types.is_datetime64_any_dtype(ts):
try:
ts = pd.to_datetime(ts, errors = "raise", utc = require_utc)
except Exception as e:
raise ValueError("'timestamp' must be datetime format") from e
# UTC requirement
if require_utc:
if ts.dt.tz is None:
raise ValueError("Timestamps must be timezone aware UTC")
if str(ts.dt.tz) not in ("UTC", "UTC+00:00", "tzutc()"):
raise ValueError("Timestamps must be in UTC.")
# Sorted requirement (chronological order if required)
if require_sorted and not ts.is_monotonic_increasing:
raise ValueError("Timestamps must be ascending.")
# 'value' must be numeric
if not pd.api.types.is_numeric_dtype(df[_VAL]):
raise ValueError("'value' column must be numeric.")
return
[docs]
def resample(df: pd.DataFrame, *, rule: str = "5S", agg: str = "mean") -> pd.DataFrame:
"""
Resample irregularly spaced data to a uniform cadence.
Arguments:
df (pd.DataFrame): Long form telemetry data.
rule (str): Resample frequency ('1S', '5S', '1min').
agg (str): Aggregation method ('mean', 'median', etc.) when multiple values exist per interval.
Returns:
pd.DataFrame: Resampled dataset with regular time intervals.
"""
df = df.copy()
# Ensure proper timestamp dtype
df[_TS] = pd.to_datetime(df[_TS])
# Pivot to wide form
wide = df.pivot_table(index = _TS, columns = _VAR, values = _VAL)
# Resample
if agg == "mean":
wide = wide.resample(rule).mean()
elif agg == "median":
wide = wide.resample(rule).median()
elif agg == "min":
wide = wide.resample(rule).min()
elif agg == "max":
wide = wide.resample(rule).max()
else:
raise ValueError(f"Unsupported agg method: {agg}")
# Fill gaps with forward-fill then backfill
# This reduces NaNs in windowing
wide = wide.ffill().bfill()
# Melt back to long form
long = (
wide
.reset_index()
.melt(id_vars = [_TS], var_name=_VAR, value_name=_VAL)
.dropna(subset = [_VAL]) # remove variables missing entirely
.sort_values(_TS)
.reset_index(drop = True)
)
return long
[docs]
def interpolate_gaps(df: pd.DataFrame, *, method = "ffill", limit = 1) -> pd.DataFrame:
"""
Fill small missing gaps to ensure continuous time steps.
Arguments:
df (pd.DataFrame): Resampled telemetry data.
method (str): Interpolation strategy ('ffill', 'linear', 'bfill', etc.).
limit (int): Maximum consecutive non existant values steps to fill.
Returns:
pd.DataFrame: Gap filled dataset.
"""
return df
[docs]
def normalize_fit(df, *, method = "zscore"):
"""
Compute normalization parameters for each variable.
Arguments:
df (pd.DataFrame): Cleaned telemetry data (usually training subset).
method (str): Normalization method ('zscore' or 'minmax').
Returns:
dict: Mapping {variable: (mean, std)} or {variable: (min, range)}.
"""
pass
[docs]
def pipeline(df: pd.DataFrame, *, physical_bounds: Optional[dict] = None, resample_rule: Optional[str] = "5S", resample_agg: str = "mean",) -> pd.DataFrame:
"""
Execute minimal preprocessing pipeline for this dataset.
Steps: clean -> dedupe -> integrity_check -> resample -> interpolate_gaps.
Arguments:
df (pd.DataFrame): Raw telemetry dataset.
rule (str): Resampling frequency (default '5S').
agg (str): Aggregation method (default 'mean').
gap_limit (int): Max forward-fill gap length.
norm_method (str): Optional normalization mode.
physical_bounds (dict, optional): Min/max physical limits per variable.
Returns:
pd.DataFrame: Fully preprocessed dataset.
"""
# Remove nulls, non physical readings, and sort data
df = clean(df, physical_bounds = physical_bounds)
# Remove dupes or retransmits
df = dedupe(df)
# Ensure timestamps and values are valid
if resample_rule is not None:
df = resample(df, rule = resample_rule, agg = resample_agg)
return df