Source code for gammapy.modeling.parameter

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Model parameter classes."""
import collections.abc
import copy
import html
import itertools
import logging
import numpy as np
from astropy import units as u
from astropy.table import Table
from gammapy.utils.deprecation import deprecated_attribute
from gammapy.utils.interpolation import interpolation_scale

__all__ = ["Parameter", "Parameters", "PriorParameter", "PriorParameters"]

log = logging.getLogger(__name__)


def _get_parameters_str(parameters):
    str_ = ""

    for par in parameters:
        if par.name == "amplitude":
            value_format, error_format = "{:10.2e}", "{:7.1e}"
        else:
            value_format, error_format = "{:10.3f}", "{:7.2f}"

        line = "\t{:21} {:8}: " + value_format + "\t {} {:<12s}\n"

        if par._link_label_io is not None:
            name = par._link_label_io
        else:
            name = par.name

        if par.frozen:
            frozen, error = "(frozen)", "\t\t"
        else:
            frozen = ""
            try:
                error = "+/- " + error_format.format(par.error)
            except AttributeError:
                error = ""
        str_ += line.format(name, frozen, par.value, error, par.unit)
    return str_.expandtabs(tabsize=2)


[docs]class Parameter: """A model parameter. Note that the parameter value has been split into a factor and scale like this:: value = factor x scale Users should interact with the ``value``, ``quantity`` or ``min`` and ``max`` properties and consider the fact that there is a ``factor``` and ``scale`` an implementation detail. That was introduced for numerical stability in parameter and error estimation methods, only in the Gammapy optimiser interface do we interact with the ``factor``, ``factor_min`` and ``factor_max`` properties, i.e. the optimiser "sees" the well-scaled problem. Parameters ---------- name : str Name. value : float or `~astropy.units.Quantity` Value. scale : float, optional Scale (sometimes used in fitting). unit : `~astropy.units.Unit` or str, optional Unit. min : float, optional Minimum (sometimes used in fitting). max : float, optional Maximum (sometimes used in fitting). frozen : bool, optional Frozen (used in fitting). error : float Parameter error. scan_min : float Minimum value for the parameter scan. Overwrites scan_n_sigma. scan_max : float Minimum value for the parameter scan. Overwrites scan_n_sigma. scan_n_values: int Number of values to be used for the parameter scan. scan_n_sigma : int Number of sigmas to scan. scan_values: `numpy.array` Scan values. Overwrites all the scan keywords before. scale_method : {'scale10', 'factor1', None} Method used to set ``factor`` and ``scale``. interp : {"lin", "sqrt", "log"} Parameter scaling to use for the scan. is_norm : bool Whether the parameter represents the flux norm of the model. prior : `~gammapy.modeling.models.Prior` Prior set on the parameter. """ norm_parameters = deprecated_attribute("norm_parameters", "1.2") is_norm = deprecated_attribute("is_norm", "1.2") def __init__( self, name, value, unit="", scale=1, min=np.nan, max=np.nan, frozen=False, error=0, scan_min=None, scan_max=None, scan_n_values=11, scan_n_sigma=2, scan_values=None, scale_method="scale10", interp="lin", is_norm=False, prior=None, ): if not isinstance(name, str): raise TypeError(f"Name must be string, got '{type(name)}' instead") self._name = name self._link_label_io = None self.scale = scale self.min = min self.max = max self.frozen = frozen self._error = error self._is_norm = is_norm self._type = None # TODO: move this to a setter method that can be called from `__set__` also! # Having it here is bad: behaviour not clear if Quantity and `unit` is passed. if isinstance(value, u.Quantity) or isinstance(value, str): val = u.Quantity(value) self.value = val.value self.unit = val.unit else: self.value = float(value) self.unit = unit self.scan_min = scan_min self.scan_max = scan_max self.scan_values = scan_values self.scan_n_values = scan_n_values self.scan_n_sigma = scan_n_sigma self.interp = interp self.scale_method = scale_method self.prior = prior def __get__(self, instance, owner): if instance is None: return self par = instance.__dict__[self.name] par._type = getattr(instance, "type", None) return par def __set__(self, instance, value): if isinstance(value, Parameter): instance.__dict__[self.name] = value else: par = instance.__dict__[self.name] raise TypeError(f"Cannot assign {value!r} to parameter {par!r}") def __set_name__(self, owner, name): if not self._name == name: raise ValueError(f"Expected parameter name '{name}', got {self._name}") @property def prior(self): """Prior applied to the parameter as a `~gammapy.modeling.models.Prior`.""" return self._prior @prior.setter def prior(self, value): if value is not None: from .models import Prior if isinstance(value, dict): from .models import Model self._prior = Model.from_dict({"prior": value}) elif isinstance(value, Prior): self._prior = value else: raise TypeError(f"Invalid type: {value!r}") else: self._prior = value
[docs] def prior_stat_sum(self): if self.prior is not None: return self.prior(self)
@property def type(self): return self._type @property def error(self): return self._error @error.setter def error(self, value): self._error = float(u.Quantity(value, unit=self.unit).value) @property def name(self): """Name as a string.""" return self._name @property def factor(self): """Factor as a float.""" return self._factor @factor.setter def factor(self, val): self._factor = float(val) @property def scale(self): """Scale as a float.""" return self._scale @scale.setter def scale(self, val): self._scale = float(val) @property def unit(self): """Unit as a `~astropy.units.Unit` object.""" return self._unit @unit.setter def unit(self, val): self._unit = u.Unit(val) @property def min(self): """Minimum as a float.""" return self._min @min.setter def min(self, val): """`~astropy.table.Table` has masked values for NaN. Replacing with NaN.""" if isinstance(val, np.ma.core.MaskedConstant): self._min = np.nan else: self._min = float(val) @property def factor_min(self): """Factor minimum as a float. This ``factor_min = min / scale`` is for the optimizer interface. """ return self.min / self.scale @property def max(self): """Maximum as a float.""" return self._max @max.setter def max(self, val): """`~astropy.table.Table` has masked values for NaN. Replacing with NaN.""" if isinstance(val, np.ma.core.MaskedConstant): self._max = np.nan else: self._max = float(val) @property def factor_max(self): """Factor maximum as a float. This ``factor_max = max / scale`` is for the optimizer interface. """ return self.max / self.scale @property def scale_method(self): """Method used to set ``factor`` and ``scale``.""" return self._scale_method @scale_method.setter def scale_method(self, val): if val not in ["scale10", "factor1"] and val is not None: raise ValueError(f"Invalid method: {val}") self._scale_method = val @property def frozen(self): """Frozen (used in fitting) (bool).""" return self._frozen @frozen.setter def frozen(self, val): if val in ["True", "False"]: val = bool(val) if not isinstance(val, bool) and not isinstance(val, np.bool_): raise TypeError(f"Invalid type: {val}, {type(val)}") self._frozen = val @property def value(self): """Value = factor x scale (float).""" return self._factor * self._scale @value.setter def value(self, val): self.factor = float(val) / self._scale @property def quantity(self): """Value times unit as a `~astropy.units.Quantity`.""" return self.value * self.unit @quantity.setter def quantity(self, val): val = u.Quantity(val) if not val.unit.is_equivalent(self.unit): raise u.UnitConversionError( f"Unit must be equivalent to {self.unit} for parameter {self.name}" ) self.value = val.value self.unit = val.unit # TODO: possibly allow to set this independently @property def conf_min(self): """Confidence minimum value as a `float`. Return parameter minimum if defined, otherwise return the scan_min. """ if not np.isnan(self.min): return self.min else: return self.scan_min # TODO: possibly allow to set this independently @property def conf_max(self): """Confidence maximum value as a `float`. Return parameter maximum if defined, otherwise return the scan_max. """ if not np.isnan(self.max): return self.max else: return self.scan_max @property def scan_min(self): """Stat scan minimum.""" if self._scan_min is None: return self.value - self.error * self.scan_n_sigma return self._scan_min @property def scan_max(self): """Stat scan maximum.""" if self._scan_max is None: return self.value + self.error * self.scan_n_sigma return self._scan_max @scan_min.setter def scan_min(self, value): """Stat scan minimum setter.""" self._scan_min = value @scan_max.setter def scan_max(self, value): """Stat scan maximum setter.""" self._scan_max = value @property def scan_n_sigma(self): """Stat scan n sigma.""" return self._scan_n_sigma @scan_n_sigma.setter def scan_n_sigma(self, n_sigma): """Stat scan n sigma.""" self._scan_n_sigma = int(n_sigma) @property def scan_values(self): """Stat scan values as a `~numpy.ndarray`.""" if self._scan_values is None: scale = interpolation_scale(self.interp) parmin, parmax = scale([self.scan_min, self.scan_max]) values = np.linspace(parmin, parmax, self.scan_n_values) return scale.inverse(values) return self._scan_values @scan_values.setter def scan_values(self, values): """Set scan values.""" self._scan_values = values
[docs] def check_limits(self): """Emit a warning or error if value is outside the minimum/maximum range.""" if not self.frozen: if (~np.isnan(self.min) and (self.value <= self.min)) or ( ~np.isnan(self.max) and (self.value >= self.max) ): log.warning( f"Value {self.value} is outside bounds [{self.min}, {self.max}]" f" for parameter '{self.name}'" )
def __repr__(self): return ( f"{self.__class__.__name__}(name={self.name!r}, value={self.value!r}, " f"factor={self.factor!r}, scale={self.scale!r}, unit={self.unit!r}, " f"min={self.min!r}, max={self.max!r}, frozen={self.frozen!r}, prior={self.prior!r}, id={hex(id(self))})" ) def _repr_html_(self): try: return self.to_html() except AttributeError: return f"<pre>{html.escape(str(self))}</pre>"
[docs] def copy(self): """Deep copy.""" return copy.deepcopy(self)
[docs] def update_from_dict(self, data): """Update parameters from a dictionary.""" keys = ["value", "unit", "min", "max", "frozen", "prior"] for k in keys: if k == "prior" and data[k] == "": data[k] = None setattr(self, k, data[k])
[docs] def to_dict(self): """Convert to dictionary.""" output = { "name": self.name, "value": self.value, "unit": self.unit.to_string("fits"), "error": self.error, "min": self.min, "max": self.max, "frozen": self.frozen, "interp": self.interp, "scale_method": self.scale_method, "is_norm": self._is_norm, } if self._link_label_io is not None: output["link"] = self._link_label_io if self.prior is not None: output["prior"] = self.prior.to_dict()["prior"] return output
[docs] def autoscale(self): """Autoscale the parameters. Set ``factor`` and ``scale`` according to ``scale_method`` attribute. Available ``scale_method``. * ``scale10`` sets ``scale`` to power of 10, so that abs(factor) is in the range 1 to 10 * ``factor1`` sets ``factor, scale = 1, value`` In both cases the sign of value is stored in ``factor``, i.e. the ``scale`` is always positive. If ``scale_method`` is None the scaling is ignored. """ if self.scale_method == "scale10": value = self.value if value != 0: exponent = np.floor(np.log10(np.abs(value))) scale = np.power(10.0, exponent) self.factor = value / scale self.scale = scale elif self.scale_method == "factor1": self.factor, self.scale = 1, self.value
[docs]class Parameters(collections.abc.Sequence): """Parameters container. - List of `Parameter` objects. - Covariance matrix. Parameters ---------- parameters : list of `Parameter` List of parameters. """ def __init__(self, parameters=None): if parameters is None: parameters = [] else: parameters = list(parameters) self._parameters = parameters def _repr_html_(self): try: return self.to_html() except AttributeError: return f"<pre>{html.escape(str(self))}</pre>"
[docs] def check_limits(self): """Check parameter limits and emit a warning.""" for par in self: par.check_limits()
@property def prior(self): return [par.prior for par in self]
[docs] def prior_stat_sum(self): parameters_stat_sum = 0 for par in self: if par.prior is not None: parameters_stat_sum += par.prior_stat_sum() return parameters_stat_sum
@property def types(self): """Parameter types.""" return [par.type for par in self] @property def min(self): """Parameter minima as a `numpy.ndarray`.""" return np.array([_.min for _ in self._parameters], dtype=np.float64) @min.setter def min(self, min_array): """Parameter minima as a `numpy.ndarray`.""" if not len(self) == len(min_array): raise ValueError("Minima must have same length as parameter list") for min_, par in zip(min_array, self): par.min = min_ @property def max(self): """Parameter maxima as a `numpy.ndarray`.""" return np.array([_.max for _ in self._parameters], dtype=np.float64) @max.setter def max(self, max_array): """Parameter maxima as a `numpy.ndarray`.""" if not len(self) == len(max_array): raise ValueError("Maxima must have same length as parameter list") for max_, par in zip(max_array, self): par.max = max_ @property def value(self): """Parameter values as a `numpy.ndarray`.""" return np.array([_.value for _ in self._parameters], dtype=np.float64) @value.setter def value(self, values): """Parameter values as a `numpy.ndarray`.""" if not len(self) == len(values): raise ValueError("Values must have same length as parameter list") for value, par in zip(values, self): par.value = value
[docs] @classmethod def from_stack(cls, parameters_list): """Create `Parameters` by stacking a list of other `Parameters` objects. Parameters ---------- parameters_list : list of `Parameters` List of `Parameters` objects. """ pars = itertools.chain(*parameters_list) return cls(pars)
[docs] def copy(self): """Deep copy.""" return copy.deepcopy(self)
@property def norm_parameters(self): """List of norm parameters.""" return self.__class__([par for par in self._parameters if par._is_norm]) @property def free_parameters(self): """List of free parameters.""" return self.__class__([par for par in self._parameters if not par.frozen]) @property def unique_parameters(self): """Unique parameters as a `Parameters` object.""" return self.__class__(dict.fromkeys(self._parameters)) @property def names(self): """List of parameter names.""" return [par.name for par in self._parameters]
[docs] def index(self, val): """Get position index for a given parameter. The input can be a parameter object, parameter name (str) or if a parameter index (int) is passed in, it is simply returned. """ if isinstance(val, int): return val elif isinstance(val, Parameter): return self._parameters.index(val) elif isinstance(val, str): for idx, par in enumerate(self._parameters): if val == par.name: return idx raise IndexError(f"No parameter: {val!r}") else: raise TypeError(f"Invalid type: {type(val)!r}")
def __getitem__(self, key): """Access parameter by name, index or boolean mask.""" if isinstance(key, np.ndarray) and key.dtype == bool: return self.__class__(list(np.array(self._parameters)[key])) else: idx = self.index(key) return self._parameters[idx] def __len__(self): return len(self._parameters) def __add__(self, other): if isinstance(other, Parameters): return Parameters.from_stack([self, other]) else: raise TypeError(f"Invalid type: {other!r}")
[docs] def to_dict(self): data = [] for par in self._parameters: data.append(par.to_dict()) return data
@staticmethod def _create_default_table(): name_to_type = { "type": "str", "name": "str", "value": "float", "unit": "str", "error": "float", "min": "float", "max": "float", "frozen": "bool", "is_norm": "bool", "link": "str", "prior": "str", } return Table(names=name_to_type.keys(), dtype=name_to_type.values())
[docs] def to_table(self): """Convert parameter attributes to `~astropy.table.Table`.""" table = self._create_default_table() for p in self._parameters: d = {k: v for k, v in p.to_dict().items() if k in table.colnames} if "prior" in d: d["prior"] = d["prior"]["type"] table.add_row(d) table["value"].format = ".4e" for name in ["error", "min", "max"]: table[name].format = ".3e" return table
def __eq__(self, other): all_equal = np.all([p is p_new for p, p_new in zip(self, other)]) return all_equal and len(self) == len(other)
[docs] @classmethod def from_dict(cls, data): parameters = [] for par in data: link_label = par.pop("link", None) parameter = Parameter(**par) parameter._link_label_io = link_label parameters.append(parameter) return cls(parameters=parameters)
[docs] def set_parameter_factors(self, factors): """Set factor of all parameters. Used in the optimizer interface. """ idx = 0 for parameter in self._parameters: if not parameter.frozen: parameter.factor = factors[idx] idx += 1
[docs] def autoscale(self): """Autoscale all parameters. See :func:`~gammapy.modeling.Parameter.autoscale`. """ for par in self._parameters: par.autoscale()
[docs] def select( self, name=None, type=None, frozen=None, ): """Create a mask of models, true if all conditions are verified. Parameters ---------- name : str or list, optional Name of the parameter. Default is None. type : {None, "spatial", "spectral", "temporal"} Type of models. Default is None. frozen : bool, optional Select frozen parameters if True, exclude them if False. Default is None. Returns ------- parameters : `Parameters` Selected parameters. """ selection = np.ones(len(self), dtype=bool) if name and not isinstance(name, list): name = [name] for idx, par in enumerate(self): if name: selection[idx] &= np.any([_ == par.name for _ in name]) if type: selection[idx] &= type == par.type if frozen is not None: if frozen: selection[idx] &= par.frozen else: selection[idx] &= ~par.frozen return self[selection]
[docs] def freeze_all(self): """Freeze all parameters.""" for par in self._parameters: par.frozen = True
[docs] def unfreeze_all(self): """Unfreeze all parameters (even those frozen by default).""" for par in self._parameters: par.frozen = False
[docs] def restore_status(self, restore_values=True): """Context manager to restore status. A copy of the values is made on enter, and those values are restored on exit. Parameters ---------- restore_values : bool, optional Restore values if True, otherwise restore only frozen status. Default is None. Examples -------- >>> from gammapy.modeling.models import PowerLawSpectralModel >>> pwl = PowerLawSpectralModel(index=2) >>> with pwl.parameters.restore_status(): ... pwl.parameters["index"].value = 3 >>> print(pwl.parameters["index"].value) # doctest: +SKIP """ return restore_parameters_status(self, restore_values)
class restore_parameters_status: def __init__(self, parameters, restore_values=True): self.restore_values = restore_values self._parameters = parameters self.values = [_.value for _ in parameters] self.frozen = [_.frozen for _ in parameters] def __enter__(self): pass def __exit__(self, type, value, traceback): for value, par, frozen in zip(self.values, self._parameters, self.frozen): if self.restore_values: par.value = value par.frozen = frozen
[docs]class PriorParameter(Parameter): def __init__( self, name, value, unit="", scale=1, min=np.nan, max=np.nan, error=0, ): if not isinstance(name, str): raise TypeError(f"Name must be string, got '{type(name)}' instead") self._name = name self.scale = scale self.min = min self.max = max self._error = error if isinstance(value, u.Quantity) or isinstance(value, str): val = u.Quantity(value) self.value = val.value self.unit = val.unit else: self.factor = value self.unit = unit self._type = "prior"
[docs] def to_dict(self): """Convert to dictionary.""" output = { "name": self.name, "value": self.value, "unit": self.unit.to_string("fits"), "error": self.error, "min": self.min, "max": self.max, } return output
def __repr__(self): return ( f"{self.__class__.__name__}(name={self.name!r}, value={self.value!r}, " f"factor={self.factor!r}, scale={self.scale!r}, unit={self.unit!r}, " f"min={self.min!r}, max={self.max!r})" )
[docs]class PriorParameters(Parameters): def __init__(self, parameters=None): if parameters is None: parameters = [] else: parameters = list(parameters) self._parameters = parameters
[docs] def to_table(self): """Convert parameter attributes to `~astropy.table.Table`.""" rows = [] for p in self._parameters: d = p.to_dict() rows.append({**dict(type=p.type), **d}) table = Table(rows) table["value"].format = ".4e" for name in ["error", "min", "max"]: table[name].format = ".3e" return table
[docs] @classmethod def from_dict(cls, data): parameters = [] for par in data: parameter = PriorParameter(**par) parameters.append(parameter) return cls(parameters=parameters)