Source code for neurological_lrd_analysis.ml_baselines.feature_extraction

"""
Feature extraction for machine learning-based Hurst exponent estimation.

This module provides comprehensive feature extraction methods for time series data,
including statistical, spectral, wavelet, fractal, and biomedical-specific features
that are relevant for Hurst exponent estimation.

Author: Davian R. Chin (PhD Candidate in Biomedical Engineering, University of Reading, UK)
"""

import numpy as np
from typing import Dict, List, Optional, Tuple, Union
from dataclasses import dataclass
import warnings

# Lazy imports for optional dependencies
def _lazy_import_scipy():
    """Lazy import of scipy modules"""
    try:
        import scipy.stats as stats
        import scipy.signal as signal
        from scipy.fft import fft, fftfreq
        return stats, signal, fft, fftfreq
    except ImportError:
        warnings.warn("scipy not available, some features will be disabled")
        return None, None, None, None

def _lazy_import_sklearn():
    """Lazy import of sklearn modules"""
    try:
        from sklearn.preprocessing import StandardScaler
        from sklearn.feature_selection import SelectKBest, f_regression
        return StandardScaler, SelectKBest, f_regression
    except ImportError:
        warnings.warn("scikit-learn not available, ML features will be disabled")
        return None, None, None

def _lazy_import_pywt():
    """Lazy import of pywavelets"""
    try:
        import pywt
        return pywt
    except ImportError:
        warnings.warn("PyWavelets not available, wavelet features will be disabled")
        return None


@dataclass
class FeatureSet:
    """Container for extracted features."""
    statistical: Dict[str, float]
    spectral: Dict[str, float]
    wavelet: Dict[str, float]
    fractal: Dict[str, float]
    biomedical: Dict[str, float]
    combined: np.ndarray
    feature_names: List[str]


[docs] class TimeSeriesFeatureExtractor: """ Comprehensive feature extractor for time series data. Extracts statistical, spectral, wavelet, fractal, and biomedical-specific features that are relevant for Hurst exponent estimation. """
[docs] def __init__(self, include_spectral: bool = True, include_wavelet: bool = True, include_fractal: bool = True, include_biomedical: bool = True, sampling_rate: float = 250.0): """ Initialize the feature extractor. Parameters: ----------- include_spectral : bool Whether to include spectral features include_wavelet : bool Whether to include wavelet features include_fractal : bool Whether to include fractal features include_biomedical : bool Whether to include biomedical-specific features sampling_rate : float Sampling rate for biomedical feature extraction """ self.include_spectral = include_spectral self.include_wavelet = include_wavelet self.include_fractal = include_fractal self.sampling_rate = sampling_rate self.include_biomedical = include_biomedical # Lazy import dependencies self._scipy_stats, self._scipy_signal, self._fft, self._fftfreq = _lazy_import_scipy() self._pywt = _lazy_import_pywt() self._scaler, self._select_k_best, self._f_regression = _lazy_import_sklearn()
[docs] def extract_features(self, data: np.ndarray, true_hurst: Optional[float] = None) -> FeatureSet: """ Extract comprehensive features from time series data. Parameters: ----------- data : np.ndarray Time series data true_hurst : float, optional True Hurst exponent (for validation) Returns: -------- FeatureSet Extracted features """ # Remove NaN values data = data[~np.isnan(data)] if len(data) < 10: raise ValueError("Data too short for feature extraction") # Extract different feature types statistical_features = extract_statistical_features(data) spectral_features = {} wavelet_features = {} fractal_features = {} biomedical_features = {} if self.include_spectral and self._scipy_stats is not None: spectral_features = extract_spectral_features(data, self._scipy_signal, self._fft, self._fftfreq) if self.include_wavelet and self._pywt is not None: wavelet_features = extract_wavelet_features(data, self._pywt) if self.include_fractal: fractal_features = extract_fractal_features(data) if self.include_biomedical: biomedical_features = extract_biomedical_features(data, self.sampling_rate) # Combine all features all_features = {**statistical_features, **spectral_features, **wavelet_features, **fractal_features, **biomedical_features} # Convert to array feature_names = list(all_features.keys()) combined_features = np.array(list(all_features.values())) return FeatureSet( statistical=statistical_features, spectral=spectral_features, wavelet=wavelet_features, fractal=fractal_features, biomedical=biomedical_features, combined=combined_features, feature_names=feature_names )
[docs] def extract_statistical_features(data: np.ndarray) -> Dict[str, float]: """ Extract statistical features from time series data. Parameters: ----------- data : np.ndarray Time series data Returns: -------- Dict[str, float] Statistical features """ features = {} # Basic statistics features['mean'] = float(np.mean(data)) features['std'] = float(np.std(data)) features['var'] = float(np.var(data)) features['skewness'] = float(_calculate_skewness(data)) features['kurtosis'] = float(_calculate_kurtosis(data)) features['min'] = float(np.min(data)) features['max'] = float(np.max(data)) features['range'] = float(np.max(data) - np.min(data)) # Central moments features['moment_2'] = float(np.mean((data - np.mean(data))**2)) features['moment_3'] = float(np.mean((data - np.mean(data))**3)) features['moment_4'] = float(np.mean((data - np.mean(data))**4)) # Quantiles features['q25'] = float(np.percentile(data, 25)) features['q50'] = float(np.percentile(data, 50)) features['q75'] = float(np.percentile(data, 75)) features['iqr'] = float(np.percentile(data, 75) - np.percentile(data, 25)) # Autocorrelation features if len(data) > 1: autocorr_1 = _autocorrelation(data, lag=1) autocorr_2 = _autocorrelation(data, lag=2) autocorr_5 = _autocorrelation(data, lag=5) features['autocorr_1'] = float(autocorr_1) features['autocorr_2'] = float(autocorr_2) features['autocorr_5'] = float(autocorr_5) features['autocorr_decay'] = float(autocorr_1 - autocorr_5) # Trend features features['trend_slope'] = float(_trend_slope(data)) features['detrended_std'] = float(_detrended_std(data)) # Stationarity features features['adf_stat'] = float(_adf_statistic(data)) features['kpss_stat'] = float(_kpss_statistic(data)) return features
[docs] def extract_spectral_features(data: np.ndarray, scipy_signal, fft, fftfreq) -> Dict[str, float]: """ Extract spectral features from time series data. Parameters: ----------- data : np.ndarray Time series data scipy_signal : module scipy.signal module fft : function FFT function fftfreq : function FFT frequency function Returns: -------- Dict[str, float] Spectral features """ features = {} if scipy_signal is None or fft is None: return features try: # Power spectral density freqs, psd = scipy_signal.welch(data, nperseg=min(len(data)//4, 256)) # Spectral features features['spectral_centroid'] = float(np.sum(freqs * psd) / np.sum(psd)) features['spectral_bandwidth'] = float(np.sqrt(np.sum(((freqs - features['spectral_centroid'])**2) * psd) / np.sum(psd))) features['spectral_rolloff'] = float(_spectral_rolloff(freqs, psd)) features['spectral_flatness'] = float(_spectral_flatness(psd)) # Frequency band features delta_power = np.sum(psd[(freqs >= 0.5) & (freqs <= 4)]) theta_power = np.sum(psd[(freqs >= 4) & (freqs <= 8)]) alpha_power = np.sum(psd[(freqs >= 8) & (freqs <= 13)]) beta_power = np.sum(psd[(freqs >= 13) & (freqs <= 30)]) gamma_power = np.sum(psd[(freqs >= 30) & (freqs <= 100)]) total_power = np.sum(psd) features['delta_ratio'] = float(delta_power / total_power) features['theta_ratio'] = float(theta_power / total_power) features['alpha_ratio'] = float(alpha_power / total_power) features['beta_ratio'] = float(beta_power / total_power) features['gamma_ratio'] = float(gamma_power / total_power) # Dominant frequency features['dominant_freq'] = float(freqs[np.argmax(psd)]) # Spectral entropy features['spectral_entropy'] = float(_spectral_entropy(psd)) except Exception as e: warnings.warn(f"Spectral feature extraction failed: {e}") return features
[docs] def extract_wavelet_features(data: np.ndarray, pywt) -> Dict[str, float]: """ Extract wavelet features from time series data. Parameters: ----------- data : np.ndarray Time series data pywt : module PyWavelets module Returns: -------- Dict[str, float] Wavelet features """ features = {} if pywt is None: return features try: # Wavelet decomposition coeffs = pywt.wavedec(data, 'db4', level=4) # Energy features total_energy = sum(np.sum(c**2) for c in coeffs) for i, coeff in enumerate(coeffs): energy = np.sum(coeff**2) features[f'wavelet_energy_level_{i}'] = float(energy) features[f'wavelet_energy_ratio_level_{i}'] = float(energy / total_energy) # Wavelet variance for i, coeff in enumerate(coeffs[1:], 1): # Skip approximation coefficients features[f'wavelet_var_level_{i}'] = float(np.var(coeff)) features[f'wavelet_std_level_{i}'] = float(np.std(coeff)) # Wavelet entropy features['wavelet_entropy'] = float(_wavelet_entropy(coeffs)) # Dominant scale energy_by_level = [np.sum(coeffs[i]**2) for i in range(1, len(coeffs))] features['dominant_scale'] = float(np.argmax(energy_by_level) + 1) except Exception as e: warnings.warn(f"Wavelet feature extraction failed: {e}") return features
[docs] def extract_fractal_features(data: np.ndarray) -> Dict[str, float]: """ Extract fractal features from time series data. Parameters: ----------- data : np.ndarray Time series data Returns: -------- Dict[str, float] Fractal features """ features = {} try: # Detrended fluctuation analysis (simplified) features['dfa_alpha'] = float(_simple_dfa(data)) # Higuchi fractal dimension features['higuchi_fd'] = float(_higuchi_fractal_dimension(data)) # Katz fractal dimension features['katz_fd'] = float(_katz_fractal_dimension(data)) # Petrosian fractal dimension features['petrosian_fd'] = float(_petrosian_fractal_dimension(data)) # Hurst-like features features['hurst_rs'] = float(_rs_analysis(data)) features['hurst_variance'] = float(_variance_method(data)) except Exception as e: warnings.warn(f"Fractal feature extraction failed: {e}") return features
[docs] def extract_biomedical_features(data: np.ndarray, sampling_rate: float) -> Dict[str, float]: """ Extract biomedical-specific features from time series data. Parameters: ----------- data : np.ndarray Time series data sampling_rate : float Sampling rate in Hz Returns: -------- Dict[str, float] Biomedical features """ features = {} try: # Amplitude features features['amplitude_range'] = float(np.max(data) - np.min(data)) features['amplitude_std'] = float(np.std(data)) features['amplitude_rms'] = float(np.sqrt(np.mean(data**2))) # Zero crossing rate features['zero_crossing_rate'] = float(_zero_crossing_rate(data)) # Peak detection features peaks, _ = _find_peaks_simple(data) features['peak_count'] = float(len(peaks)) features['peak_rate'] = float(len(peaks) / (len(data) / sampling_rate)) if len(peaks) > 0: features['peak_amplitude_mean'] = float(np.mean(data[peaks])) features['peak_amplitude_std'] = float(np.std(data[peaks])) # Signal quality features features['snr_estimate'] = float(_estimate_snr(data)) features['signal_quality'] = float(_signal_quality_score(data)) # Biomedical frequency bands (assuming EEG-like data) if sampling_rate > 0: freqs = np.fft.fftfreq(len(data), 1/sampling_rate) fft_data = np.abs(np.fft.fft(data)) # Band power ratios delta_mask = (freqs >= 0.5) & (freqs <= 4) theta_mask = (freqs >= 4) & (freqs <= 8) alpha_mask = (freqs >= 8) & (freqs <= 13) beta_mask = (freqs >= 13) & (freqs <= 30) delta_power = np.sum(fft_data[delta_mask]) theta_power = np.sum(fft_data[theta_mask]) alpha_power = np.sum(fft_data[alpha_mask]) beta_power = np.sum(fft_data[beta_mask]) total_power = delta_power + theta_power + alpha_power + beta_power if total_power > 0: features['biomedical_delta_ratio'] = float(delta_power / total_power) features['biomedical_theta_ratio'] = float(theta_power / total_power) features['biomedical_alpha_ratio'] = float(alpha_power / total_power) features['biomedical_beta_ratio'] = float(beta_power / total_power) except Exception as e: warnings.warn(f"Biomedical feature extraction failed: {e}") return features
# Helper functions for feature extraction def _calculate_skewness(data: np.ndarray) -> float: """Calculate skewness.""" mean = np.mean(data) std = np.std(data) if std == 0: return 0.0 return np.mean(((data - mean) / std) ** 3) def _calculate_kurtosis(data: np.ndarray) -> float: """Calculate kurtosis.""" mean = np.mean(data) std = np.std(data) if std == 0: return 0.0 return np.mean(((data - mean) / std) ** 4) - 3 def _autocorrelation(data: np.ndarray, lag: int) -> float: """Calculate autocorrelation at given lag.""" if lag >= len(data): return 0.0 data_centered = data - np.mean(data) return np.corrcoef(data_centered[:-lag], data_centered[lag:])[0, 1] def _trend_slope(data: np.ndarray) -> float: """Calculate linear trend slope.""" x = np.arange(len(data)) return np.polyfit(x, data, 1)[0] def _detrended_std(data: np.ndarray) -> float: """Calculate standard deviation after detrending.""" x = np.arange(len(data)) trend = np.polyval(np.polyfit(x, data, 1), x) detrended = data - trend return np.std(detrended) def _adf_statistic(data: np.ndarray) -> float: """Simplified ADF statistic.""" # This is a simplified version - in practice, use scipy.stats diff = np.diff(data) return np.corrcoef(data[:-1], diff)[0, 1] def _kpss_statistic(data: np.ndarray) -> float: """Simplified KPSS statistic.""" # This is a simplified version - in practice, use scipy.stats cumsum = np.cumsum(data - np.mean(data)) return np.sum(cumsum**2) / (len(data) * np.var(data)) def _spectral_rolloff(freqs: np.ndarray, psd: np.ndarray, rolloff: float = 0.85) -> float: """Calculate spectral rolloff frequency.""" cumsum_psd = np.cumsum(psd) threshold = rolloff * cumsum_psd[-1] idx = np.where(cumsum_psd >= threshold)[0] return freqs[idx[0]] if len(idx) > 0 else freqs[-1] def _spectral_flatness(psd: np.ndarray) -> float: """Calculate spectral flatness.""" geometric_mean = np.exp(np.mean(np.log(psd + 1e-10))) arithmetic_mean = np.mean(psd) return geometric_mean / arithmetic_mean def _spectral_entropy(psd: np.ndarray) -> float: """Calculate spectral entropy.""" psd_norm = psd / np.sum(psd) psd_norm = psd_norm[psd_norm > 0] return -np.sum(psd_norm * np.log2(psd_norm)) def _wavelet_entropy(coeffs: List[np.ndarray]) -> float: """Calculate wavelet entropy.""" energies = [np.sum(c**2) for c in coeffs] total_energy = sum(energies) if total_energy == 0: return 0.0 probs = [e / total_energy for e in energies] return -sum(p * np.log2(p) for p in probs if p > 0) def _simple_dfa(data: np.ndarray) -> float: """Simplified DFA calculation.""" n = len(data) scales = np.logspace(0.5, np.log10(n//4), 10).astype(int) fluctuations = [] for scale in scales: # Detrend and calculate fluctuation n_windows = n // scale if n_windows < 2: continue fluctuations_at_scale = [] for i in range(n_windows): segment = data[i*scale:(i+1)*scale] if len(segment) > 1: x = np.arange(len(segment)) trend = np.polyval(np.polyfit(x, segment, 1), x) detrended = segment - trend fluctuations_at_scale.append(np.mean(detrended**2)) if fluctuations_at_scale: fluctuations.append(np.mean(fluctuations_at_scale)) if len(fluctuations) < 2: return 0.5 # Linear regression in log space log_scales = np.log(scales[:len(fluctuations)]) log_fluctuations = np.log(fluctuations) slope = np.polyfit(log_scales, log_fluctuations, 1)[0] return slope / 2 def _higuchi_fractal_dimension(data: np.ndarray) -> float: """Calculate Higuchi fractal dimension.""" n = len(data) k_max = min(10, n // 4) lengths = [] for k in range(1, k_max + 1): l_k = 0 for m in range(k): l_mk = 0 for i in range(1, (n - m) // k): l_mk += abs(data[m + i*k] - data[m + (i-1)*k]) l_mk = l_mk * (n - 1) / (((n - m) // k) * k) l_k += l_mk lengths.append(l_k / k) if len(lengths) < 2: return 1.0 # Linear regression k_values = np.arange(1, len(lengths) + 1) log_k = np.log(k_values) log_l = np.log(lengths) slope = np.polyfit(log_k, log_l, 1)[0] return -slope def _katz_fractal_dimension(data: np.ndarray) -> float: """Calculate Katz fractal dimension.""" n = len(data) if n < 2: return 1.0 # Calculate distances distances = np.abs(np.diff(data)) total_distance = np.sum(distances) if total_distance == 0: return 1.0 # Maximum distance from start max_distance = np.max(np.abs(data - data[0])) if max_distance == 0: return 1.0 return np.log(n) / (np.log(n) + np.log(max_distance / total_distance)) def _petrosian_fractal_dimension(data: np.ndarray) -> float: """Calculate Petrosian fractal dimension.""" n = len(data) if n < 2: return 1.0 # Binary sequence based on differences diff = np.diff(data) binary = (diff > 0).astype(int) # Count sign changes sign_changes = np.sum(np.diff(binary) != 0) if sign_changes == 0: return 1.0 return np.log(n) / (np.log(n) + np.log(n / (n + 0.4 * sign_changes))) def _rs_analysis(data: np.ndarray) -> float: """Simplified R/S analysis.""" n = len(data) if n < 4: return 0.5 # Calculate R/S for different scales scales = np.logspace(1, np.log10(n//4), 5).astype(int) rs_values = [] for scale in scales: n_windows = n // scale if n_windows < 2: continue rs_window = [] for i in range(n_windows): segment = data[i*scale:(i+1)*scale] if len(segment) > 1: mean_seg = np.mean(segment) cumsum = np.cumsum(segment - mean_seg) R = np.max(cumsum) - np.min(cumsum) S = np.std(segment) if S > 0: rs_window.append(R / S) if rs_window: rs_values.append(np.mean(rs_window)) if len(rs_values) < 2: return 0.5 # Linear regression log_scales = np.log(scales[:len(rs_values)]) log_rs = np.log(rs_values) slope = np.polyfit(log_scales, log_rs, 1)[0] return slope def _variance_method(data: np.ndarray) -> float: """Variance-based Hurst estimation.""" n = len(data) if n < 4: return 0.5 scales = np.logspace(1, np.log10(n//4), 5).astype(int) variances = [] for scale in scales: n_windows = n // scale if n_windows < 2: continue var_window = [] for i in range(n_windows): segment = data[i*scale:(i+1)*scale] if len(segment) > 1: var_window.append(np.var(segment)) if var_window: variances.append(np.mean(var_window)) if len(variances) < 2: return 0.5 # Linear regression log_scales = np.log(scales[:len(variances)]) log_vars = np.log(variances) slope = np.polyfit(log_scales, log_vars, 1)[0] return slope / 2 + 0.5 def _zero_crossing_rate(data: np.ndarray) -> float: """Calculate zero crossing rate.""" if len(data) < 2: return 0.0 sign_changes = np.sum(np.diff(np.sign(data)) != 0) return sign_changes / (len(data) - 1) def _find_peaks_simple(data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Simple peak detection.""" if len(data) < 3: return np.array([]), np.array([]) peaks = [] for i in range(1, len(data) - 1): if data[i] > data[i-1] and data[i] > data[i+1]: peaks.append(i) return np.array(peaks), np.array([data[i] for i in peaks]) def _estimate_snr(data: np.ndarray) -> float: """Estimate signal-to-noise ratio.""" signal_power = np.var(data) # Simple noise estimation using high-frequency components if len(data) > 4: noise = np.diff(data, 2) # Second difference as noise proxy noise_power = np.var(noise) if noise_power > 0: return 10 * np.log10(signal_power / noise_power) return 0.0 def _signal_quality_score(data: np.ndarray) -> float: """Calculate signal quality score (0-1).""" if len(data) < 2: return 0.0 # Based on variance, range, and stationarity variance_score = min(1.0, np.var(data) / (np.max(data) - np.min(data))**2) range_score = min(1.0, (np.max(data) - np.min(data)) / (np.std(data) * 6)) # Stationarity (simplified) n = len(data) if n > 4: first_half = data[:n//2] second_half = data[n//2:] stationarity_score = 1.0 - abs(np.mean(first_half) - np.mean(second_half)) / (np.std(data) + 1e-10) stationarity_score = max(0.0, min(1.0, stationarity_score)) else: stationarity_score = 0.5 return (variance_score + range_score + stationarity_score) / 3