Source code for nevo.core.state
"""
State Feature Extraction
=========================
This module computes state features that describe the current
optimisation landscape and guide operator selection.
"""
import numpy as np
from typing import Dict, Any, List
EPS_MAX = 1e10
[docs]
class StateFeatures:
"""
Extracts interpretable features from optimisation state.
Features:
- diversity: Spread of solutions in search space [0, 1]
- improvement_rate: Fraction of recent improvements [0, 1]
- convergence: Fitness homogeneity indicator [0, 1]
"""
[docs]
def __init__(self, history_length: int = 50):
"""
Parameters
----------
history_length : int
Number of recent timesteps to consider for improvement rate
"""
self.history_length = history_length
self.improvement_history: List[float] = []
[docs]
def compute(self, state: Dict[str, Any]) -> np.ndarray:
"""
Compute 3D feature vector from current state.
Parameters
----------
state : Dict[str, Any]
Current optimisation state containing:
- memory_vectors: np.ndarray of shape (MU, D)
- memory_fitness: np.ndarray of shape (MU,)
- f_default_worst: float (sentinel value for invalid solutions)
Returns
-------
features : np.ndarray
Feature vector [diversity, improvement_rate, convergence]
"""
features = {
"diversity": 0.5,
"improvement_rate": 0.5,
"convergence": 0.0,
}
# Extract state components
memory_fitness = state.get("memory_fitness", np.array([]))
memory_vectors = state.get("memory_vectors", np.array([]))
f_default_worst = state.get("f_default_worst", EPS_MAX)
# Filter valid solutions
valid_mask = memory_fitness < f_default_worst
n_valid = np.sum(valid_mask)
if n_valid < 2:
return np.array(
[
features["diversity"],
features["improvement_rate"],
features["convergence"],
]
)
valid_fitness = memory_fitness[valid_mask]
valid_vectors = memory_vectors[valid_mask]
# 1. DIVERSITY: Spread in search space
# Use average standard deviation across dimensions
std_per_dim = np.std(valid_vectors, axis=0)
avg_std = np.mean(std_per_dim)
# In [-1,1] space, std ranges from 0 (identical) to ~0.577 (uniform)
diversity = np.clip(avg_std / 0.6, 0.0, 1.0)
features["diversity"] = float(diversity)
# 2. IMPROVEMENT RATE: Fraction of recent improvements
if len(self.improvement_history) >= 10:
recent = self.improvement_history[-self.history_length :]
rate = np.mean(recent)
features["improvement_rate"] = float(rate)
# 3. CONVERGENCE: Fitness homogeneity
f_min = np.min(valid_fitness)
f_max = np.max(valid_fitness)
f_range = f_max - f_min
# Relative convergence
if abs(f_min) > 1e-12:
relative_range = f_range / abs(f_min)
# Small range = high convergence
convergence = 1.0 / (1.0 + 10.0 * relative_range)
features["convergence"] = float(convergence)
else:
# Near-zero fitness, use absolute range
convergence = 1.0 - np.clip(f_range / 100.0, 0.0, 1.0)
features["convergence"] = float(convergence)
return np.array(
[
features["diversity"],
features["improvement_rate"],
features["convergence"],
]
)
[docs]
def update_improvement_history(self, improved: bool):
"""
Update improvement history.
Parameters
----------
improved : bool
Whether the best solution improved this timestep
"""
self.improvement_history.append(1.0 if improved else 0.0)
# Keep only recent history
if len(self.improvement_history) > self.history_length * 2:
self.improvement_history = self.improvement_history[-self.history_length :]
[docs]
def reset(self):
"""Reset improvement history."""
self.improvement_history = []
[docs]
def compute_fitness_weighted_centre(state: Dict[str, Any]) -> np.ndarray:
"""
Compute fitness-weighted centroid from memory.
Uses rank-based weighting to emphasise better solutions.
Parameters
----------
state : Dict[str, Any]
optimisation state containing memory
Returns
-------
centre : np.ndarray
Fitness-weighted centroid in v-space
"""
memory_fitness = state.get("memory_fitness", np.array([]))
memory_vectors = state.get("memory_vectors", np.array([]))
f_default_worst = state.get("f_default_worst", EPS_MAX)
valid_mask = memory_fitness < f_default_worst
if not np.any(valid_mask):
# No valid solutions, return origin
dim = len(memory_vectors[0]) if len(memory_vectors) > 0 else 10
return np.zeros(dim)
valid_fitness = memory_fitness[valid_mask]
valid_vectors = memory_vectors[valid_mask]
# Rank-based weighting (better solutions get higher weight)
ranks = np.argsort(np.argsort(valid_fitness))
weights = 1.0 / (ranks + 1.0)
weights /= np.sum(weights)
return np.average(valid_vectors, axis=0, weights=weights)