"""
Pretrained Model System for ML Baseline Estimators.
This module provides a comprehensive system for creating, storing, and loading
pretrained ML models for Hurst exponent estimation. It includes model training
pipelines, efficient storage, metadata management, and inference systems.
Author: Davian R. Chin (PhD Candidate in Biomedical Engineering, University of Reading, UK)
"""
import numpy as np
import joblib
import json
import hashlib
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union, Any, Callable
from dataclasses import dataclass, asdict
from datetime import datetime
import warnings
import shutil
from enum import Enum
# Import ML components
from .ml_estimators import (
MLBaselineType, RandomForestEstimator, SVREstimator, GradientBoostingEstimator,
MLBaselineFactory, BaseMLEstimator, MLTrainingResult
)
from .feature_extraction import TimeSeriesFeatureExtractor
from .hyperparameter_optimization import optimize_all_estimators
class ModelStatus(Enum):
"""Status of pretrained models."""
TRAINING = "training"
TRAINED = "trained"
VALIDATED = "validated"
DEPRECATED = "deprecated"
FAILED = "failed"
@dataclass
class ModelMetadata:
"""Metadata for pretrained models."""
model_id: str
model_type: MLBaselineType
version: str
created_at: datetime
training_data_info: Dict[str, Any]
performance_metrics: Dict[str, float]
hyperparameters: Dict[str, Any]
feature_extractor_config: Dict[str, Any]
status: ModelStatus
file_path: str
checksum: str
description: Optional[str] = None
tags: Optional[List[str]] = None
author: Optional[str] = None
license: Optional[str] = None
@dataclass
class TrainingConfig:
"""Configuration for model training."""
model_type: MLBaselineType
hyperparameters: Dict[str, Any]
training_data_config: Dict[str, Any]
validation_split: float = 0.2
random_state: int = 42
optimize_hyperparameters: bool = False
optimization_trials: int = 50
description: Optional[str] = None
tags: Optional[List[str]] = None
[docs]
class PretrainedModelManager:
"""
Manager for pretrained ML models.
Handles creation, storage, loading, and management of pretrained models
for Hurst exponent estimation.
"""
[docs]
def __init__(self, models_dir: Union[str, Path] = "pretrained_models"):
"""
Initialize the pretrained model manager.
Parameters:
-----------
models_dir : str or Path
Directory to store pretrained models
"""
self.models_dir = Path(models_dir)
self.models_dir.mkdir(parents=True, exist_ok=True)
# Create subdirectories
self.models_path = self.models_dir / "models"
self.metadata_path = self.models_dir / "metadata"
self.cache_path = self.models_dir / "cache"
for path in [self.models_path, self.metadata_path, self.cache_path]:
path.mkdir(exist_ok=True)
# Load existing metadata
self._metadata_registry = self._load_metadata_registry()
def _load_metadata_registry(self) -> Dict[str, ModelMetadata]:
"""Load existing metadata registry."""
registry = {}
for metadata_file in self.metadata_path.glob("*.json"):
try:
with open(metadata_file, 'r') as f:
data = json.load(f)
# Convert datetime strings back to datetime objects
data['created_at'] = datetime.fromisoformat(data['created_at'])
data['model_type'] = MLBaselineType(data['model_type'])
data['status'] = ModelStatus(data['status'])
registry[data['model_id']] = ModelMetadata(**data)
except Exception as e:
warnings.warn(f"Failed to load metadata from {metadata_file}: {e}")
return registry
def _save_metadata(self, metadata: ModelMetadata) -> None:
"""Save model metadata."""
metadata_file = self.metadata_path / f"{metadata.model_id}.json"
# Convert to dict and handle datetime serialization
data = asdict(metadata)
data['created_at'] = metadata.created_at.isoformat()
data['model_type'] = metadata.model_type.value
data['status'] = metadata.status.value
with open(metadata_file, 'w') as f:
json.dump(data, f, indent=2)
def _generate_model_id(self, model_type: MLBaselineType,
training_config: Dict[str, Any]) -> str:
"""Generate unique model ID."""
# Create hash from model type and key config parameters
config_str = f"{model_type.value}_{training_config.get('random_state', 42)}"
if 'hyperparameters' in training_config:
config_str += f"_{str(sorted(training_config['hyperparameters'].items()))}"
hash_obj = hashlib.md5(config_str.encode())
return f"{model_type.value}_{hash_obj.hexdigest()[:8]}"
def _calculate_checksum(self, file_path: Path) -> str:
"""Calculate file checksum."""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
[docs]
def create_training_data(self,
hurst_values: List[float] = None,
lengths: List[int] = None,
n_samples_per_config: int = 100,
generators: List[str] = None,
contaminations: List[str] = None,
biomedical_scenarios: List[str] = None,
random_state: int = 42) -> Tuple[np.ndarray, np.ndarray, Dict[str, Any]]:
"""
Create comprehensive training dataset.
Parameters:
-----------
hurst_values : List[float], optional
Hurst values to generate
lengths : List[int], optional
Time series lengths
n_samples_per_config : int
Number of samples per configuration
generators : List[str], optional
Data generators to use
contaminations : List[str], optional
Contamination types
biomedical_scenarios : List[str], optional
Biomedical scenarios
random_state : int
Random state for reproducibility
Returns:
--------
Tuple[np.ndarray, np.ndarray, Dict[str, Any]]
(X, y, training_info) - features, targets, and metadata
"""
if hurst_values is None:
hurst_values = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
if lengths is None:
lengths = [500, 1000, 2000]
if generators is None:
generators = ['fbm', 'fgn', 'arfima', 'mrw', 'fou']
if contaminations is None:
contaminations = ['none', 'noise', 'missing', 'artifacts']
if biomedical_scenarios is None:
biomedical_scenarios = ['eeg', 'ecg', 'respiratory']
print(f"Creating comprehensive training dataset...")
print(f" - Hurst values: {hurst_values}")
print(f" - Lengths: {lengths}")
print(f" - Generators: {generators}")
print(f" - Contaminations: {contaminations}")
print(f" - Biomedical scenarios: {biomedical_scenarios}")
print(f" - Samples per config: {n_samples_per_config}")
# Generate synthetic data
from ..benchmark_core.generation import generate_grid
samples = generate_grid(
hurst_values=hurst_values,
lengths=lengths,
contaminations=contaminations,
generators=generators,
biomedical_scenarios=biomedical_scenarios
)
# Extract features
extractor = TimeSeriesFeatureExtractor()
X = []
y = []
print("Extracting features...")
for i, sample in enumerate(samples):
if i % 100 == 0 and i > 0:
print(f" Processed {i}/{len(samples)} samples")
features = extractor.extract_features(sample.data, sample.true_hurst)
X.append(features.combined)
y.append(sample.true_hurst)
X = np.array(X)
y = np.array(y)
# Create training info
training_info = {
'n_samples': len(X),
'n_features': X.shape[1],
'hurst_range': (min(y), max(y)),
'hurst_values': hurst_values,
'lengths': lengths,
'generators': generators,
'contaminations': contaminations,
'biomedical_scenarios': biomedical_scenarios,
'n_samples_per_config': n_samples_per_config,
'random_state': random_state,
'feature_extractor_config': {
'include_spectral': extractor.include_spectral,
'include_wavelet': extractor.include_wavelet,
'include_fractal': extractor.include_fractal,
'include_biomedical': extractor.include_biomedical,
'sampling_rate': extractor.sampling_rate
}
}
print(f"Generated {len(X)} samples with {X.shape[1]} features")
return X, y, training_info
[docs]
def train_model(self,
training_config: TrainingConfig,
X: np.ndarray,
y: np.ndarray,
training_info: Dict[str, Any]) -> ModelMetadata:
"""
Train a model and save it as pretrained.
Parameters:
-----------
training_config : TrainingConfig
Training configuration
X : np.ndarray
Training features
y : np.ndarray
Training targets
training_info : Dict[str, Any]
Training dataset information
Returns:
--------
ModelMetadata
Metadata for the trained model
"""
print(f"Training {training_config.model_type.value} model...")
# Generate model ID
model_id = self._generate_model_id(training_config.model_type, asdict(training_config))
# Check if model already exists
if model_id in self._metadata_registry:
print(f"Model {model_id} already exists, skipping training")
return self._metadata_registry[model_id]
try:
# Create estimator
estimator = MLBaselineFactory.create_estimator(
training_config.model_type,
**training_config.hyperparameters
)
# Optimize hyperparameters if requested
if training_config.optimize_hyperparameters:
print("Optimizing hyperparameters...")
opt_results = optimize_all_estimators(
X, y,
estimator_types=[training_config.model_type.value],
n_trials=training_config.optimization_trials,
random_state=training_config.random_state
)
if training_config.model_type.value in opt_results:
best_params = opt_results[training_config.model_type.value].best_params
print(f"Best parameters: {best_params}")
# Update estimator with best parameters
estimator = MLBaselineFactory.create_estimator(
training_config.model_type,
**best_params
)
# Train model
print("Training model...")
training_result = estimator.train(
X, y,
validation_split=training_config.validation_split,
random_state=training_config.random_state
)
# Save model
model_path = self.models_path / f"{model_id}.joblib"
estimator.save_model(model_path)
# Calculate checksum
checksum = self._calculate_checksum(model_path)
# Create metadata
metadata = ModelMetadata(
model_id=model_id,
model_type=training_config.model_type,
version="1.0.0",
created_at=datetime.now(),
training_data_info=training_info,
performance_metrics={
'training_score': training_result.training_score,
'validation_score': training_result.validation_score,
'cv_mean': np.mean(training_result.cross_val_scores),
'cv_std': np.std(training_result.cross_val_scores)
},
hyperparameters=training_config.hyperparameters,
feature_extractor_config=training_info['feature_extractor_config'],
status=ModelStatus.TRAINED,
file_path=str(model_path),
checksum=checksum,
description=training_config.description,
tags=training_config.tags,
author="Davian R. Chin",
license="MIT"
)
# Save metadata
self._save_metadata(metadata)
self._metadata_registry[model_id] = metadata
print(f"Model {model_id} trained and saved successfully")
print(f" Training score: {training_result.training_score:.4f}")
print(f" Validation score: {training_result.validation_score:.4f}")
print(f" CV score: {np.mean(training_result.cross_val_scores):.4f} ± {np.std(training_result.cross_val_scores):.4f}")
return metadata
except Exception as e:
print(f"Failed to train model {model_id}: {e}")
# Create failed metadata
metadata = ModelMetadata(
model_id=model_id,
model_type=training_config.model_type,
version="1.0.0",
created_at=datetime.now(),
training_data_info=training_info,
performance_metrics={},
hyperparameters=training_config.hyperparameters,
feature_extractor_config=training_info['feature_extractor_config'],
status=ModelStatus.FAILED,
file_path="",
checksum="",
description=f"Failed: {str(e)}"
)
self._save_metadata(metadata)
self._metadata_registry[model_id] = metadata
raise
[docs]
def load_model(self, model_id: str) -> Tuple[BaseMLEstimator, ModelMetadata]:
"""
Load a pretrained model.
Parameters:
-----------
model_id : str
ID of the model to load
Returns:
--------
Tuple[BaseMLEstimator, ModelMetadata]
Loaded model and its metadata
"""
if model_id not in self._metadata_registry:
raise ValueError(f"Model {model_id} not found")
metadata = self._metadata_registry[model_id]
if metadata.status == ModelStatus.FAILED:
raise ValueError(f"Model {model_id} failed during training")
# Load model
model_path = Path(metadata.file_path)
if not model_path.exists():
raise FileNotFoundError(f"Model file {model_path} not found")
# Verify checksum
current_checksum = self._calculate_checksum(model_path)
if current_checksum != metadata.checksum:
warnings.warn(f"Checksum mismatch for model {model_id}")
# Create estimator and load model
estimator = MLBaselineFactory.create_estimator(metadata.model_type)
estimator.load_model(model_path)
return estimator, metadata
[docs]
def list_models(self,
model_type: Optional[MLBaselineType] = None,
status: Optional[ModelStatus] = None,
tags: Optional[List[str]] = None) -> List[ModelMetadata]:
"""
List available models with optional filtering.
Parameters:
-----------
model_type : MLBaselineType, optional
Filter by model type
status : ModelStatus, optional
Filter by status
tags : List[str], optional
Filter by tags
Returns:
--------
List[ModelMetadata]
List of matching models
"""
models = list(self._metadata_registry.values())
if model_type is not None:
models = [m for m in models if m.model_type == model_type]
if status is not None:
models = [m for m in models if m.status == status]
if tags is not None:
models = [m for m in models if m.tags and any(tag in m.tags for tag in tags)]
return sorted(models, key=lambda x: x.created_at, reverse=True)
[docs]
def get_best_model(self,
model_type: MLBaselineType,
metric: str = 'validation_score') -> Tuple[BaseMLEstimator, ModelMetadata]:
"""
Get the best performing model of a given type.
Parameters:
-----------
model_type : MLBaselineType
Type of model to get
metric : str
Metric to use for ranking
Returns:
--------
Tuple[BaseMLEstimator, ModelMetadata]
Best model and its metadata
"""
models = self.list_models(model_type=model_type, status=ModelStatus.TRAINED)
if not models:
raise ValueError(f"No trained models of type {model_type.value} found")
# Find best model by metric
best_model = max(models, key=lambda x: x.performance_metrics.get(metric, -np.inf))
return self.load_model(best_model.model_id)
[docs]
def predict(self,
model_id: str,
data: np.ndarray,
return_metadata: bool = False) -> Union[float, Tuple[float, ModelMetadata]]:
"""
Make prediction using a pretrained model.
Parameters:
-----------
model_id : str
ID of the model to use
data : np.ndarray
Time series data
return_metadata : bool
Whether to return model metadata
Returns:
--------
Union[float, Tuple[float, ModelMetadata]]
Prediction result and optionally metadata
"""
estimator, metadata = self.load_model(model_id)
# Extract features using the same configuration as training
extractor = TimeSeriesFeatureExtractor(**metadata.feature_extractor_config)
features = extractor.extract_features(data)
# Make prediction
prediction = estimator.predict(features.combined.reshape(1, -1))
if return_metadata:
return prediction.hurst_estimate, metadata
else:
return prediction.hurst_estimate
[docs]
def create_model_suite(self,
training_configs: List[TrainingConfig],
X: np.ndarray,
y: np.ndarray,
training_info: Dict[str, Any]) -> List[ModelMetadata]:
"""
Create a suite of pretrained models.
Parameters:
-----------
training_configs : List[TrainingConfig]
List of training configurations
X : np.ndarray
Training features
y : np.ndarray
Training targets
training_info : Dict[str, Any]
Training dataset information
Returns:
--------
List[ModelMetadata]
Metadata for all trained models
"""
print(f"Creating model suite with {len(training_configs)} models...")
results = []
for i, config in enumerate(training_configs):
print(f"\nTraining model {i+1}/{len(training_configs)}: {config.model_type.value}")
try:
metadata = self.train_model(config, X, y, training_info)
results.append(metadata)
except Exception as e:
print(f"Failed to train {config.model_type.value}: {e}")
continue
print(f"\nSuccessfully trained {len(results)} models")
return results
[docs]
def cleanup_models(self,
keep_best: bool = True,
max_models_per_type: int = 5) -> None:
"""
Clean up old or redundant models.
Parameters:
-----------
keep_best : bool
Whether to keep the best performing model of each type
max_models_per_type : int
Maximum number of models to keep per type
"""
print("Cleaning up models...")
for model_type in MLBaselineType:
models = self.list_models(model_type=model_type, status=ModelStatus.TRAINED)
if len(models) <= max_models_per_type:
continue
# Sort by performance
models.sort(key=lambda x: x.performance_metrics.get('validation_score', -np.inf), reverse=True)
# Keep best models
models_to_keep = models[:max_models_per_type]
models_to_remove = models[max_models_per_type:]
for model in models_to_remove:
print(f"Removing model {model.model_id}")
# Remove model file
model_path = Path(model.file_path)
if model_path.exists():
model_path.unlink()
# Remove metadata
metadata_file = self.metadata_path / f"{model.model_id}.json"
if metadata_file.exists():
metadata_file.unlink()
# Update registry
if model.model_id in self._metadata_registry:
del self._metadata_registry[model.model_id]
print("Cleanup completed")
[docs]
def create_default_training_configs() -> List[TrainingConfig]:
"""Create default training configurations for all model types."""
configs = []
# Random Forest configurations
configs.append(TrainingConfig(
model_type=MLBaselineType.RANDOM_FOREST,
hyperparameters={'n_estimators': 100, 'max_depth': 10, 'random_state': 42},
training_data_config={'n_samples_per_config': 50},
description="Random Forest with moderate complexity",
tags=['default', 'random_forest']
))
configs.append(TrainingConfig(
model_type=MLBaselineType.RANDOM_FOREST,
hyperparameters={'n_estimators': 200, 'max_depth': 15, 'random_state': 42},
training_data_config={'n_samples_per_config': 100},
optimize_hyperparameters=True,
optimization_trials=30,
description="Random Forest with hyperparameter optimization",
tags=['optimized', 'random_forest']
))
# SVR configurations
configs.append(TrainingConfig(
model_type=MLBaselineType.SVR,
hyperparameters={'C': 1.0, 'epsilon': 0.1, 'kernel': 'rbf'},
training_data_config={'n_samples_per_config': 50},
description="SVR with RBF kernel",
tags=['default', 'svr']
))
configs.append(TrainingConfig(
model_type=MLBaselineType.SVR,
hyperparameters={'C': 10.0, 'epsilon': 0.01, 'kernel': 'rbf'},
training_data_config={'n_samples_per_config': 100},
optimize_hyperparameters=True,
optimization_trials=30,
description="SVR with hyperparameter optimization",
tags=['optimized', 'svr']
))
# Gradient Boosting configurations
configs.append(TrainingConfig(
model_type=MLBaselineType.GRADIENT_BOOSTING,
hyperparameters={'n_estimators': 100, 'learning_rate': 0.1, 'max_depth': 3, 'random_state': 42},
training_data_config={'n_samples_per_config': 50},
description="Gradient Boosting with moderate complexity",
tags=['default', 'gradient_boosting']
))
configs.append(TrainingConfig(
model_type=MLBaselineType.GRADIENT_BOOSTING,
hyperparameters={'n_estimators': 200, 'learning_rate': 0.05, 'max_depth': 5, 'random_state': 42},
training_data_config={'n_samples_per_config': 100},
optimize_hyperparameters=True,
optimization_trials=30,
description="Gradient Boosting with hyperparameter optimization",
tags=['optimized', 'gradient_boosting']
))
return configs
[docs]
def create_pretrained_suite(models_dir: Union[str, Path] = "pretrained_models",
force_retrain: bool = False) -> PretrainedModelManager:
"""
Create a complete suite of pretrained models.
Parameters:
-----------
models_dir : str or Path
Directory to store models
force_retrain : bool
Whether to retrain existing models
Returns:
--------
PretrainedModelManager
Manager with trained models
"""
manager = PretrainedModelManager(models_dir)
# Check if models already exist
existing_models = manager.list_models(status=ModelStatus.TRAINED)
if existing_models and not force_retrain:
print(f"Found {len(existing_models)} existing models")
return manager
print("Creating comprehensive training dataset...")
# Create training data
X, y, training_info = manager.create_training_data(
hurst_values=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
lengths=[500, 1000, 2000],
n_samples_per_config=50,
generators=['fbm', 'fgn', 'arfima', 'mrw', 'fou'],
contaminations=['none', 'noise', 'missing', 'artifacts'],
biomedical_scenarios=['eeg', 'ecg', 'respiratory']
)
# Create training configurations
training_configs = create_default_training_configs()
# Train all models
results = manager.create_model_suite(training_configs, X, y, training_info)
print(f"\nSuccessfully created {len(results)} pretrained models")
return manager