"""Manager class for efficiently handling multiple ReducedLikelihood instances."""
from typing import List, Tuple, Callable, Optional, Union
import numpy as np
from scipy import sparse
from ._reduced_likelihood import ReducedLikelihood
[docs]
class ReducedLikelihoodManager:
r"""Manager for multiple ReducedLikelihood instances with shared Jacobian computation.
This class manages multiple ReducedLikelihood objects and provides efficient
evaluation of the combined objective function, gradient, and Hessian by caching
Jacobian matrices to avoid redundant computation.
Parameters
----------
fwd_funcs : List[Tuple[Callable, dict]]
List of (forward_function, fwd_kwargs) tuples for each dataset
d_obs_list : List[np.ndarray]
List of observed data arrays, one per forward function
jacobian_fn : Callable
Function with signature: jacobian_fn(model, n_data, fwd_func, fwd_kwargs) -> np.ndarray
Should return Jacobian matrix of shape (n_data, n_params)
cases : Union[str, List[str]], default='none'
Covariance case(s). Either a single string applied to all, or a list
with one case per dataset. Options: 'none', 'scaled', 'kernel',
'spherical', 'diag', 'full'
Cd_refs : Optional[Union[np.ndarray, List[np.ndarray]]], default=None
Reference covariance matrix(ces). Either a single matrix applied to all,
or a list with one per dataset. Required for 'scaled' case.
For 'none' case: defaults to identity matrix if not provided.
Not used for 'spherical', 'diag', or 'full' cases.
kernels : optional
Kernel instance(s) for 'kernel' case. Either a single kernel
applied to all datasets using 'kernel', or a list with one per
dataset. Required for datasets with ``case='kernel'``.
copy_G : bool, default=False
If True, copy Jacobian matrices before assigning to ReducedLikelihood.G.
Use if jacobian_fn returns views that may be mutated.
track_stats : bool, default=False
If True, track statistics on cache hits and Jacobian evaluations
Attributes
----------
reduced_likelihoods : List[ReducedLikelihood]
The managed ReducedLikelihood instances
stats : dict
Statistics dictionary (if track_stats=True) with keys:
- 'n_cache_hits': Number of cache hits
- 'n_jacobian_evals': Total Jacobian evaluations across all datasets
Examples
--------
>>> import numpy as np
>>> from cofi.utils import ReducedLikelihoodManager
>>>
>>> # Define forward functions and data
>>> def fwd1(m): return np.array([m[0], m[1]])
>>> def fwd2(m): return np.array([m[0] + m[1]])
>>> fwd_funcs = [(fwd1, {}), (fwd2, {})]
>>> d_obs_list = [np.array([1.0, 2.0]), np.array([3.0])]
>>>
>>> # Define Jacobian function
>>> def jacobian(m, n_data, fwd, fwd_kwargs):
... # Compute Jacobian (simplified example)
... if n_data == 2:
... return np.array([[1, 0], [0, 1]])
... else:
... return np.array([[1, 1]])
>>>
>>> # Create manager
>>> manager = ReducedLikelihoodManager(
... fwd_funcs=fwd_funcs,
... d_obs_list=d_obs_list,
... jacobian_fn=jacobian,
... cases=['spherical', 'spherical'],
... track_stats=True
... )
>>>
>>> # Evaluate at a model
>>> model = np.array([1.5, 2.5])
>>> obj = manager.objective(model)
>>> grad = manager.gradient(model)
>>> hess = manager.hessian(model)
>>>
>>> # Check statistics
>>> print(manager.stats)
"""
def __init__(
self,
fwd_funcs: List[Tuple[Callable, dict]],
d_obs_list: List[np.ndarray],
jacobian_fn: Callable,
cases: Union[str, List[str]] = 'none',
Cd_refs: Optional[Union[np.ndarray, List[np.ndarray]]] = None,
kernels=None,
copy_G: bool = False,
track_stats: bool = False,
):
"""Initialize the ReducedLikelihoodManager."""
self.fwd_funcs = fwd_funcs
self.d_obs_list = d_obs_list
self.jacobian_fn = jacobian_fn
self.copy_G = copy_G
self.track_stats = track_stats
# Validate inputs
if len(fwd_funcs) != len(d_obs_list):
raise ValueError(
f"fwd_funcs and d_obs_list must have same length, "
f"got {len(fwd_funcs)} and {len(d_obs_list)}"
)
n_datasets = len(d_obs_list)
# Handle cases parameter (single or list)
if isinstance(cases, str):
cases_list = [cases] * n_datasets
else:
cases_list = list(cases)
if len(cases_list) != n_datasets:
raise ValueError(
f"cases list length {len(cases_list)} does not match "
f"number of datasets {n_datasets}"
)
# Handle Cd_refs parameter (single, list, or None)
if Cd_refs is None:
Cd_refs_list = [None] * n_datasets
elif isinstance(Cd_refs, np.ndarray):
Cd_refs_list = [Cd_refs] * n_datasets
else:
Cd_refs_list = list(Cd_refs)
if len(Cd_refs_list) != n_datasets:
raise ValueError(
f"Cd_refs list length {len(Cd_refs_list)} does not match "
f"number of datasets {n_datasets}"
)
# Handle kernels parameter (single, list, or None)
if kernels is None:
kernels_list = [None] * n_datasets
elif isinstance(kernels, list):
kernels_list = kernels
if len(kernels_list) != n_datasets:
raise ValueError(
f"kernels list length {len(kernels_list)} does not match "
f"number of datasets {n_datasets}"
)
else:
# Single kernel instance applied to all kernel datasets
kernels_list = [kernels] * n_datasets
# Validate that 'scaled' case has Cd_ref and 'kernel' has kernel
for i, case in enumerate(cases_list):
if case == 'scaled' and Cd_refs_list[i] is None:
raise ValueError(
f"Case 'scaled' for dataset {i} requires a Cd_ref. "
f"Pass Cd_refs argument with a reference covariance matrix."
)
if case == 'kernel' and kernels_list[i] is None:
raise ValueError(
f"Case 'kernel' for dataset {i} requires a kernel. "
f"Pass kernels argument with a kernel instance."
)
# Create ReducedLikelihood instances
self.reduced_likelihoods = []
for (fwd, fwd_kwargs), d_obs, case, Cd_ref, kernel in zip(
fwd_funcs, d_obs_list, cases_list, Cd_refs_list, kernels_list
):
rl = ReducedLikelihood(
data=d_obs,
forward_func=fwd,
fwd_kwargs=fwd_kwargs,
G=None, # Will be set by _ensure_jacobians
Cd_ref=Cd_ref,
case=case,
kernel=kernel,
)
self.reduced_likelihoods.append(rl)
# Precompute total kernel hyperparameters for model slicing
self._n_eta = sum(
getattr(rl.kernel, 'n_params', 0)
for rl in self.reduced_likelihoods
if rl.case == 'kernel'
)
self._has_kernel = self._n_eta > 0
# Cache management
self._cache_valid = False
self._last_model = None
# Statistics tracking
if self.track_stats:
self.stats = {
'n_cache_hits': 0,
'n_jacobian_evals': 0,
}
def _models_equal(self, m1, m2):
"""Check if two models are equal."""
if m1 is None or m2 is None:
return False
return np.array_equal(m1, m2)
def _get_m_phys(self, m):
"""Return the physical model parameters (strip kernel hyperparameters)."""
if self._has_kernel:
return m[:len(m) - self._n_eta]
return m
def _get_model_for_rl(self, m, rl):
"""Return the appropriate model slice for a given ReducedLikelihood."""
if rl.case == 'kernel':
return m # full [m_phys, eta]
else:
return self._get_m_phys(m) # physical model only
def _ensure_jacobians(self, model):
"""Ensure Jacobian matrices are computed and cached for the given model.
This method checks the cache and only recomputes Jacobians if the model
has changed. Jacobians are computed in a two-phase process to ensure
cache validity is only set after all computations succeed.
"""
m = np.asarray(model).ravel()
# Check cache
if self._cache_valid and self._models_equal(self._last_model, m):
if self.track_stats:
self.stats['n_cache_hits'] += 1
return
# Jacobian is computed w.r.t. physical model only (not kernel hyperparams)
m_phys = self._get_m_phys(m)
# Compute all Jacobians first (collect before assigning to protect cache)
computed = []
for i,(rl, (fwd, fwd_kwargs), d_obs) in enumerate(zip(
self.reduced_likelihoods, self.fwd_funcs, self.d_obs_list
)):
try:
G = self.jacobian_fn(m_phys, len(d_obs), fwd, fwd_kwargs)
except Exception as e:
raise RuntimeError(f"Jacobian failed for dataset {i}: {e}") from e
computed.append((rl, G))
# Assign Jacobians only after all computations succeed
for rl, G in computed:
if self.copy_G:
if sparse.issparse(G):
rl.G = G.copy()
else:
rl.G = np.asarray(G).copy()
else:
rl.G = G
rl._cache_model = None
rl._cache.clear()
# Update stats and cache only after success
if self.track_stats:
self.stats['n_jacobian_evals'] += len(computed)
self._last_model = m.copy()
self._cache_valid = True
[docs]
def objective(self, model: np.ndarray) -> float:
"""Evaluate the combined objective function (negative log-likelihood).
Parameters
----------
model : np.ndarray
Model parameters (``[m_phys, eta]`` when using ``kernel``)
Returns
-------
float
Negative log-likelihood value
"""
self._ensure_jacobians(model)
m = np.asarray(model).ravel()
neg_log_like = 0.0
for rl in self.reduced_likelihoods:
neg_log_like += -rl.log_likelihood(self._get_model_for_rl(m, rl))
return neg_log_like
[docs]
def gradient(self, model: np.ndarray) -> np.ndarray:
"""Evaluate the gradient of the objective function.
Parameters
----------
model : np.ndarray
Model parameters (``[m_phys, eta]`` when using ``kernel``)
Returns
-------
np.ndarray
Gradient vector
"""
self._ensure_jacobians(model)
m = np.asarray(model).ravel()
n_total = m.size
g = np.zeros(n_total, dtype=float)
for rl in self.reduced_likelihoods:
m_rl = self._get_model_for_rl(m, rl)
grad_rl = -rl.gradient(m_rl)
if grad_rl.size < n_total:
# Non-kernel dataset: contributes only to m_phys part
g[:grad_rl.size] += grad_rl
else:
g += grad_rl
return g
[docs]
def hessian(self, model: np.ndarray) -> np.ndarray:
"""Evaluate the Hessian of the objective function.
Parameters
----------
model : np.ndarray
Model parameters (``[m_phys, eta]`` when using ``kernel``)
Returns
-------
np.ndarray
Hessian matrix
"""
self._ensure_jacobians(model)
m = np.asarray(model).ravel()
n_total = m.size
H = np.zeros((n_total, n_total), dtype=float)
for rl in self.reduced_likelihoods:
m_rl = self._get_model_for_rl(m, rl)
hess_rl = -rl.hessian(m_rl)
n_rl = hess_rl.shape[0]
if n_rl < n_total:
# Non-kernel dataset: contributes only to m_phys block
H[:n_rl, :n_rl] += hess_rl
else:
H += hess_rl
return H
[docs]
def get_ml_covs(self, model: np.ndarray) -> List[Optional[np.ndarray]]:
"""Get maximum likelihood covariance estimates for all datasets.
Parameters
----------
model : np.ndarray
Model parameters (``[m_phys, eta]`` when using ``kernel``)
Returns
-------
List[Optional[np.ndarray]]
List of covariance matrices, one per dataset
"""
self._ensure_jacobians(model)
m = np.asarray(model).ravel()
return [rl.get_ml_cov(self._get_model_for_rl(m, rl))
for rl in self.reduced_likelihoods]
[docs]
def invalidate_cache(self):
"""Manually invalidate the Jacobian cache.
This forces recomputation on the next evaluation. Useful if the
forward function or Jacobian computation has changed.
"""
self._cache_valid = False
self._last_model = None
for rl in self.reduced_likelihoods:
rl._cache_model = None
rl._cache.clear()
[docs]
def get_stats(self) -> dict:
"""Get performance statistics (if track_stats=True).
Returns
-------
dict
Statistics dictionary with keys 'n_cache_hits' and 'n_jacobian_evals'
Raises
------
ValueError
If track_stats was not enabled at initialization
"""
if not self.track_stats:
raise ValueError("Statistics tracking was not enabled. Set track_stats=True.")
return self.stats.copy()