Source code for gammapy.modeling.parameter

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Model parameter classes."""
import collections.abc
import copy
import itertools
import logging
import numpy as np
from astropy import units as u
from astropy.table import Table
from gammapy.utils.interpolation import interpolation_scale

__all__ = ["Parameter", "Parameters"]

log = logging.getLogger(__name__)


def _get_parameters_str(parameters):
    str_ = ""

    for par in parameters:
        if par.name == "amplitude":
            value_format, error_format = "{:10.2e}", "{:7.1e}"
        else:
            value_format, error_format = "{:10.3f}", "{:7.2f}"

        line = "\t{:21} {:8}: " + value_format + "\t {} {:<12s}\n"

        if par._link_label_io is not None:
            name = par._link_label_io
        else:
            name = par.name

        if par.frozen:
            frozen, error = "(frozen)", "\t\t"
        else:
            frozen = ""
            try:
                error = "+/- " + error_format.format(par.error)
            except AttributeError:
                error = ""
        str_ += line.format(name, frozen, par.value, error, par.unit)
    return str_.expandtabs(tabsize=2)


[docs]class Parameter: """A model parameter. Note that the parameter value has been split into a factor and scale like this:: value = factor x scale Users should interact with the ``value``, ``quantity`` or ``min`` and ``max`` properties and consider the fact that there is a ``factor``` and ``scale`` an implementation detail. That was introduced for numerical stability in parameter and error estimation methods, only in the Gammapy optimiser interface do we interact with the ``factor``, ``factor_min`` and ``factor_max`` properties, i.e. the optimiser "sees" the well-scaled problem. Parameters ---------- name : str Name value : float or `~astropy.units.Quantity` Value scale : float, optional Scale (sometimes used in fitting) unit : `~astropy.units.Unit` or str, optional Unit min : float, optional Minimum (sometimes used in fitting) max : float, optional Maximum (sometimes used in fitting) frozen : bool, optional Frozen? (used in fitting) error : float Parameter error scan_min : float Minimum value for the parameter scan. Overwrites scan_n_sigma. scan_max : float Minimum value for the parameter scan. Overwrites scan_n_sigma. scan_n_values: int Number of values to be used for the parameter scan. scan_n_sigma : int Number of sigmas to scan. scan_values: `numpy.array` Scan values. Overwrites all of the scan keywords before. scale_method : {'scale10', 'factor1', None} Method used to set ``factor`` and ``scale`` interp : {"lin", "sqrt", "log"} Parameter scaling to use for the scan. is_norm : bool Whether the parameter represents the flux norm of the model. """ def __init__( self, name, value, unit="", scale=1, min=np.nan, max=np.nan, frozen=False, error=0, scan_min=None, scan_max=None, scan_n_values=11, scan_n_sigma=2, scan_values=None, scale_method="scale10", interp="lin", is_norm=False, ): if not isinstance(name, str): raise TypeError(f"Name must be string, got '{type(name)}' instead") self._name = name self._link_label_io = None self.scale = scale self.min = min self.max = max self.frozen = frozen self._error = error self._is_norm = is_norm self._type = None # TODO: move this to a setter method that can be called from `__set__` also! # Having it here is bad: behaviour not clear if Quantity and `unit` is passed. if isinstance(value, u.Quantity) or isinstance(value, str): val = u.Quantity(value) self.value = val.value self.unit = val.unit else: self.value = float(value) self.unit = unit self.scan_min = scan_min self.scan_max = scan_max self.scan_values = scan_values self.scan_n_values = scan_n_values self.scan_n_sigma = scan_n_sigma self.interp = interp self.scale_method = scale_method def __get__(self, instance, owner): if instance is None: return self par = instance.__dict__[self.name] par._type = getattr(instance, "type", None) return par def __set__(self, instance, value): if isinstance(value, Parameter): instance.__dict__[self.name] = value else: par = instance.__dict__[self.name] raise TypeError(f"Cannot assign {value!r} to parameter {par!r}") def __set_name__(self, owner, name): if not self._name == name: raise ValueError(f"Expected parameter name '{name}', got {self._name}") @property def is_norm(self): """Whether the parameter represents the norm of the model""" return self._is_norm @property def type(self): return self._type @property def error(self): return self._error @error.setter def error(self, value): self._error = float(u.Quantity(value, unit=self.unit).value) @property def name(self): """Name (str).""" return self._name @property def factor(self): """Factor (float).""" return self._factor @factor.setter def factor(self, val): self._factor = float(val) @property def scale(self): """Scale (float).""" return self._scale @scale.setter def scale(self, val): self._scale = float(val) @property def unit(self): """Unit (`~astropy.units.Unit`).""" return self._unit @unit.setter def unit(self, val): self._unit = u.Unit(val) @property def min(self): """Minimum (float).""" return self._min @min.setter def min(self, val): "Astropy Table has masked values for NaN. Replacing with np.nan." if isinstance(val, np.ma.core.MaskedConstant): self._min = np.nan else: self._min = float(val) @property def factor_min(self): """Factor min (float). This ``factor_min = min / scale`` is for the optimizer interface. """ return self.min / self.scale @property def max(self): """Maximum (float).""" return self._max @max.setter def max(self, val): "Astropy Table has masked values for NaN. Replacing with np.nan." if isinstance(val, np.ma.core.MaskedConstant): self._max = np.nan else: self._max = float(val) @property def factor_max(self): """Factor max (float). This ``factor_max = max / scale`` is for the optimizer interface. """ return self.max / self.scale @property def scale_method(self): """Method used to set ``factor`` and ``scale``""" return self._scale_method @scale_method.setter def scale_method(self, val): if val not in ["scale10", "factor1"] and val is not None: raise ValueError(f"Invalid method: {val}") self._scale_method = val @property def frozen(self): """Frozen? (used in fitting) (bool).""" return self._frozen @frozen.setter def frozen(self, val): if val in ["True", "False"]: val = bool(val) if not isinstance(val, bool) and not isinstance(val, np.bool_): raise TypeError(f"Invalid type: {val}, {type(val)}") self._frozen = val @property def value(self): """Value = factor x scale (float).""" return self._factor * self._scale @value.setter def value(self, val): self.factor = float(val) / self._scale @property def quantity(self): """Value times unit (`~astropy.units.Quantity`).""" return self.value * self.unit @quantity.setter def quantity(self, val): val = u.Quantity(val) if not val.unit.is_equivalent(self.unit): raise u.UnitConversionError( f"Unit must be equivalent to {self.unit} for parameter {self.name}" ) self.value = val.value self.unit = val.unit # TODO: possibly allow to set this independently @property def conf_min(self): """Confidence min value (`float`) Returns parameter minimum if defined else the scan_min """ if not np.isnan(self.min): return self.min else: return self.scan_min # TODO: possibly allow to set this independently @property def conf_max(self): """Confidence max value (`float`) Returns parameter maximum if defined else the scan_max """ if not np.isnan(self.max): return self.max else: return self.scan_max @property def scan_min(self): """Stat scan min""" if self._scan_min is None: return self.value - self.error * self.scan_n_sigma return self._scan_min @property def scan_max(self): """Stat scan max""" if self._scan_max is None: return self.value + self.error * self.scan_n_sigma return self._scan_max @scan_min.setter def scan_min(self, value): """Stat scan min setter""" self._scan_min = value @scan_max.setter def scan_max(self, value): """Stat scan max setter""" self._scan_max = value @property def scan_n_sigma(self): """Stat scan n sigma""" return self._scan_n_sigma @scan_n_sigma.setter def scan_n_sigma(self, n_sigma): """Stat scan n sigma""" self._scan_n_sigma = int(n_sigma) @property def scan_values(self): """Stat scan values (`~numpy.ndarray`)""" if self._scan_values is None: scale = interpolation_scale(self.interp) parmin, parmax = scale([self.scan_min, self.scan_max]) values = np.linspace(parmin, parmax, self.scan_n_values) return scale.inverse(values) return self._scan_values @scan_values.setter def scan_values(self, values): """Set scan values""" self._scan_values = values
[docs] def check_limits(self): """Emit a warning or error if value is outside the min/max range""" if not self.frozen: if (~np.isnan(self.min) and (self.value <= self.min)) or ( ~np.isnan(self.max) and (self.value >= self.max) ): log.warning( f"Value {self.value} is outside bounds [{self.min}, {self.max}]" f" for parameter '{self.name}'" )
def __repr__(self): return ( f"{self.__class__.__name__}(name={self.name!r}, value={self.value!r}, " f"factor={self.factor!r}, scale={self.scale!r}, unit={self.unit!r}, " f"min={self.min!r}, max={self.max!r}, frozen={self.frozen!r}, id={hex(id(self))})" )
[docs] def copy(self): """A deep copy""" return copy.deepcopy(self)
[docs] def update_from_dict(self, data): """Update parameters from a dict. Protection against changing parameter model, type, name.""" keys = ["value", "unit", "min", "max", "frozen"] for k in keys: setattr(self, k, data[k])
[docs] def to_dict(self): """Convert to dict.""" output = { "name": self.name, "value": self.value, "unit": self.unit.to_string("fits"), "error": self.error, "min": self.min, "max": self.max, "frozen": self.frozen, "interp": self.interp, "scale_method": self.scale_method, "is_norm": self.is_norm, } if self._link_label_io is not None: output["link"] = self._link_label_io return output
[docs] def autoscale(self): """Autoscale the parameters. Set ``factor`` and ``scale`` according to ``scale_method`` attribute Available ``scale_method`` * ``scale10`` sets ``scale`` to power of 10, so that abs(factor) is in the range 1 to 10 * ``factor1`` sets ``factor, scale = 1, value`` In both cases the sign of value is stored in ``factor``, i.e. the ``scale`` is always positive. If ``scale_method`` is None the scaling is ignored. """ if self.scale_method == "scale10": value = self.value if value != 0: exponent = np.floor(np.log10(np.abs(value))) scale = np.power(10.0, exponent) self.factor = value / scale self.scale = scale elif self.scale_method == "factor1": self.factor, self.scale = 1, self.value
[docs]class Parameters(collections.abc.Sequence): """Parameters container. - List of `Parameter` objects. - Covariance matrix. Parameters ---------- parameters : list of `Parameter` List of parameters """ def __init__(self, parameters=None): if parameters is None: parameters = [] else: parameters = list(parameters) self._parameters = parameters
[docs] def check_limits(self): """Check parameter limits and emit a warning""" for par in self: par.check_limits()
@property def types(self): """Parameter types""" return [par.type for par in self] @property def min(self): """Parameter mins (`numpy.ndarray`).""" return np.array([_.min for _ in self._parameters], dtype=np.float64) @min.setter def min(self, min_array): """Parameter minima (`numpy.ndarray`).""" if not len(self) == len(min_array): raise ValueError("Minima must have same length as parameter list") for min_, par in zip(min_array, self): par.min = min_ @property def max(self): """Parameter maxima (`numpy.ndarray`).""" return np.array([_.max for _ in self._parameters], dtype=np.float64) @max.setter def max(self, max_array): """Parameter maxima (`numpy.ndarray`).""" if not len(self) == len(max_array): raise ValueError("Maxima must have same length as parameter list") for max_, par in zip(max_array, self): par.max = max_ @property def value(self): """Parameter values (`numpy.ndarray`).""" return np.array([_.value for _ in self._parameters], dtype=np.float64) @value.setter def value(self, values): """Parameter values (`numpy.ndarray`).""" if not len(self) == len(values): raise ValueError("Values must have same length as parameter list") for value, par in zip(values, self): par.value = value
[docs] @classmethod def from_stack(cls, parameters_list): """Create `Parameters` by stacking a list of other `Parameters` objects. Parameters ---------- parameters_list : list of `Parameters` List of `Parameters` objects """ pars = itertools.chain(*parameters_list) return cls(pars)
[docs] def copy(self): """A deep copy""" return copy.deepcopy(self)
@property def norm_parameters(self): """List of norm parameters""" return self.__class__([par for par in self._parameters if par.is_norm]) @property def free_parameters(self): """List of free parameters""" return self.__class__([par for par in self._parameters if not par.frozen]) @property def unique_parameters(self): """Unique parameters (`Parameters`).""" return self.__class__(dict.fromkeys(self._parameters)) @property def names(self): """List of parameter names""" return [par.name for par in self._parameters]
[docs] def index(self, val): """Get position index for a given parameter. The input can be a parameter object, parameter name (str) or if a parameter index (int) is passed in, it is simply returned. """ if isinstance(val, int): return val elif isinstance(val, Parameter): return self._parameters.index(val) elif isinstance(val, str): for idx, par in enumerate(self._parameters): if val == par.name: return idx raise IndexError(f"No parameter: {val!r}") else: raise TypeError(f"Invalid type: {type(val)!r}")
def __getitem__(self, key): """Access parameter by name, index or boolean mask""" if isinstance(key, np.ndarray) and key.dtype == bool: return self.__class__(list(np.array(self._parameters)[key])) else: idx = self.index(key) return self._parameters[idx] def __len__(self): return len(self._parameters) def __add__(self, other): if isinstance(other, Parameters): return Parameters.from_stack([self, other]) else: raise TypeError(f"Invalid type: {other!r}")
[docs] def to_dict(self): data = [] for par in self._parameters: data.append(par.to_dict()) return data
[docs] def to_table(self): """Convert parameter attributes to `~astropy.table.Table`.""" rows = [] for p in self._parameters: d = p.to_dict() if "link" not in d: d["link"] = "" for key in ["scale_method", "interp"]: if key in d: del d[key] rows.append({**dict(type=p.type), **d}) table = Table(rows) table["value"].format = ".4e" for name in ["error", "min", "max"]: table[name].format = ".3e" return table
def __eq__(self, other): all_equal = np.all([p is p_new for p, p_new in zip(self, other)]) return all_equal and len(self) == len(other)
[docs] @classmethod def from_dict(cls, data): parameters = [] for par in data: link_label = par.pop("link", None) parameter = Parameter(**par) parameter._link_label_io = link_label parameters.append(parameter) return cls(parameters=parameters)
[docs] def set_parameter_factors(self, factors): """Set factor of all parameters. Used in the optimizer interface. """ idx = 0 for parameter in self._parameters: if not parameter.frozen: parameter.factor = factors[idx] idx += 1
[docs] def autoscale(self): """Autoscale all parameters. See :func:`~gammapy.modeling.Parameter.autoscale` """ for par in self._parameters: par.autoscale()
[docs] def select( self, name=None, type=None, frozen=None, ): """Create a mask of models, true if all conditions are verified Parameters ---------- name : str or list Name of the parameter type : {None, spatial, spectral, temporal} type of models frozen : bool Select frozen parameters if True, exclude them if False. Returns ------- parameters : `Parameters` Selected parameters """ selection = np.ones(len(self), dtype=bool) if name and not isinstance(name, list): name = [name] for idx, par in enumerate(self): if name: selection[idx] &= np.any([_ == par.name for _ in name]) if type: selection[idx] &= type == par.type if frozen is not None: if frozen: selection[idx] &= par.frozen else: selection[idx] &= ~par.frozen return self[selection]
[docs] def freeze_all(self): """Freeze all parameters""" for par in self._parameters: par.frozen = True
[docs] def unfreeze_all(self): """Unfreeze all parameters (even those frozen by default)""" for par in self._parameters: par.frozen = False
[docs] def restore_status(self, restore_values=True): """Context manager to restore status. A copy of the values is made on enter, and those values are restored on exit. Parameters ---------- restore_values : bool Restore values if True, otherwise restore only frozen status. Examples -------- :: from gammapy.modeling.models import PowerLawSpectralModel pwl = PowerLawSpectralModel(index=2) with pwl.parameters.restore_status(): pwl.parameters["index"].value = 3 print(pwl.parameters["index"].value) """ return restore_parameters_status(self, restore_values)
class restore_parameters_status: def __init__(self, parameters, restore_values=True): self.restore_values = restore_values self._parameters = parameters self.values = [_.value for _ in parameters] self.frozen = [_.frozen for _ in parameters] def __enter__(self): pass def __exit__(self, type, value, traceback): for value, par, frozen in zip(self.values, self._parameters, self.frozen): if self.restore_values: par.value = value par.frozen = frozen