Source code for gammapy.modeling.parameter

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""Model parameter classes."""
import collections.abc
import copy
import itertools
import logging
import numpy as np
from astropy import units as u
from gammapy.utils.interpolation import interpolation_scale
from gammapy.utils.table import table_from_row_data

__all__ = ["Parameter", "Parameters"]

log = logging.getLogger(__name__)


def _get_parameters_str(parameters):
    str_ = ""

    for par in parameters:
        if par.name == "amplitude":
            value_format, error_format = "{:10.2e}", "{:7.1e}"
        else:
            value_format, error_format = "{:10.3f}", "{:7.2f}"

        line = "\t{:21} {:8}: " + value_format + "\t {} {:<12s}\n"

        if par._link_label_io is not None:
            name = par._link_label_io
        else:
            name = par.name

        if par.frozen:
            frozen, error = "(frozen)", "\t\t"
        else:
            frozen = ""
            try:
                error = "+/- " + error_format.format(par.error)
            except AttributeError:
                error = ""
        str_ += line.format(name, frozen, par.value, error, par.unit)
    return str_.expandtabs(tabsize=2)


class Parameter:
    """A model parameter.

    Note that the parameter value has been split into
    a factor and scale like this::

        value = factor x scale

    Users should interact with the ``value``, ``quantity``
    or ``min`` and ``max`` properties and consider the fact
    that there is a ``factor``` and ``scale`` an implementation detail.

    That was introduced for numerical stability in parameter and error
    estimation methods, only in the Gammapy optimiser interface do we
    interact with the ``factor``, ``factor_min`` and ``factor_max`` properties,
    i.e. the optimiser "sees" the well-scaled problem.

    Parameters
    ----------
    name : str
        Name
    value : float or `~astropy.units.Quantity`
        Value
    scale : float, optional
        Scale (sometimes used in fitting)
    unit : `~astropy.units.Unit` or str, optional
        Unit
    min : float, optional
        Minimum (sometimes used in fitting)
    max : float, optional
        Maximum (sometimes used in fitting)
    frozen : bool, optional
        Frozen? (used in fitting)
    error : float
        Parameter error
    scan_min : float
        Minimum value for the parameter scan. Overwrites scan_n_sigma.
    scan_max : float
        Minimum value for the parameter scan. Overwrites scan_n_sigma.
    scan_n_values: int
        Number of values to be used for the parameter scan.
    scan_n_sigma : int
        Number of sigmas to scan.
    scan_values: `numpy.array`
        Scan values. Overwrites all of the scan keywords before.
    scale_method : {'scale10', 'factor1', None}
        Method used to set ``factor`` and ``scale``
    interp : {"lin", "sqrt", "log"}
        Parameter scaling to use for the scan.
    is_norm : bool
        Whether the parameter represents the flux norm of the model.
    """

    def __init__(
        self,
        name,
        value,
        unit="",
        scale=1,
        min=np.nan,
        max=np.nan,
        frozen=False,
        error=0,
        scan_min=None,
        scan_max=None,
        scan_n_values=11,
        scan_n_sigma=2,
        scan_values=None,
        scale_method="scale10",
        interp="lin",
        is_norm=False,
    ):
        if not isinstance(name, str):
            raise TypeError(f"Name must be string, got '{type(name)}' instead")

        self._name = name
        self._link_label_io = None
        self.scale = scale
        self.min = min
        self.max = max
        self.frozen = frozen
        self._error = error
        self._is_norm = is_norm
        self._type = None

        # TODO: move this to a setter method that can be called from `__set__` also!
        # Having it here is bad: behaviour not clear if Quantity and `unit` is passed.
        if isinstance(value, u.Quantity) or isinstance(value, str):
            val = u.Quantity(value)
            self.value = val.value
            self.unit = val.unit
        else:
            self.factor = value
            self.unit = unit

        self.scan_min = scan_min
        self.scan_max = scan_max
        self.scan_values = scan_values
        self.scan_n_values = scan_n_values
        self.scan_n_sigma = scan_n_sigma
        self.interp = interp
        self.scale_method = scale_method

    def __get__(self, instance, owner):
        if instance is None:
            return self

        par = instance.__dict__[self.name]
        par._type = getattr(instance, "type", None)
        return par

    def __set__(self, instance, value):
        if isinstance(value, Parameter):
            instance.__dict__[self.name] = value
        else:
            par = instance.__dict__[self.name]
            raise TypeError(f"Cannot assign {value!r} to parameter {par!r}")

    def __set_name__(self, owner, name):
        if not self._name == name:
            raise ValueError(f"Expected parameter name '{name}', got {self._name}")

    @property
    def is_norm(self):
        """Whether the parameter represents the norm of the model"""
        return self._is_norm

    @property
    def type(self):
        return self._type

    @property
    def error(self):
        return self._error

    @error.setter
    def error(self, value):
        self._error = float(u.Quantity(value, unit=self.unit).value)

    @property
    def name(self):
        """Name (str)."""
        return self._name

    @property
    def factor(self):
        """Factor (float)."""
        return self._factor

    @factor.setter
    def factor(self, val):
        self._factor = float(val)

    @property
    def scale(self):
        """Scale (float)."""
        return self._scale

    @scale.setter
    def scale(self, val):
        self._scale = float(val)

    @property
    def unit(self):
        """Unit (`~astropy.units.Unit`)."""
        return self._unit

    @unit.setter
    def unit(self, val):
        self._unit = u.Unit(val)

    @property
    def min(self):
        """Minimum (float)."""
        return self._min

    @min.setter
    def min(self, val):
        "Astropy Table has masked values for NaN. Replacing with np.nan."
        if isinstance(val, np.ma.core.MaskedConstant):
            self._min = np.nan
        else:
            self._min = float(val)

    @property
    def factor_min(self):
        """Factor min (float).

        This ``factor_min = min / scale`` is for the optimizer interface.
        """
        return self.min / self.scale

    @property
    def max(self):
        """Maximum (float)."""
        return self._max

    @max.setter
    def max(self, val):
        "Astropy Table has masked values for NaN. Replacing with np.nan."
        if isinstance(val, np.ma.core.MaskedConstant):
            self._max = np.nan
        else:
            self._max = float(val)

    @property
    def factor_max(self):
        """Factor max (float).

        This ``factor_max = max / scale`` is for the optimizer interface.
        """
        return self.max / self.scale

    @property
    def scale_method(self):
        """Method used to set ``factor`` and ``scale``"""
        return self._scale_method

    @scale_method.setter
    def scale_method(self, val):
        if val not in ["scale10", "factor1"] and val is not None:
            raise ValueError(f"Invalid method: {val}")
        self._scale_method = val

    @property
    def frozen(self):
        """Frozen? (used in fitting) (bool)."""
        return self._frozen

    @frozen.setter
    def frozen(self, val):
        if val in ["True", "False"]:
            val = bool(val)
        if not isinstance(val, bool) and not isinstance(val, np.bool_):
            raise TypeError(f"Invalid type: {val}, {type(val)}")
        self._frozen = val

    @property
    def value(self):
        """Value = factor x scale (float)."""
        return self._factor * self._scale

    @value.setter
    def value(self, val):
        self.factor = float(val) / self._scale

    @property
    def quantity(self):
        """Value times unit (`~astropy.units.Quantity`)."""
        return self.value * self.unit

    @quantity.setter
    def quantity(self, val):
        val = u.Quantity(val)

        if not val.unit.is_equivalent(self.unit):
            raise u.UnitConversionError(
                f"Unit must be equivalent to {self.unit} for parameter {self.name}"
            )

        self.value = val.value
        self.unit = val.unit

    # TODO: possibly allow to set this independently
    @property
    def conf_min(self):
        """Confidence min value (`float`)

        Returns parameter minimum if defined else the scan_min
        """
        if not np.isnan(self.min):
            return self.min
        else:
            return self.scan_min

    # TODO: possibly allow to set this independently
    @property
    def conf_max(self):
        """Confidence max value (`float`)

        Returns parameter maximum if defined else the scan_max
        """
        if not np.isnan(self.max):
            return self.max
        else:
            return self.scan_max

    @property
    def scan_min(self):
        """Stat scan min"""
        if self._scan_min is None:
            return self.value - self.error * self.scan_n_sigma

        return self._scan_min

    @property
    def scan_max(self):
        """Stat scan max"""
        if self._scan_max is None:
            return self.value + self.error * self.scan_n_sigma

        return self._scan_max

    @scan_min.setter
    def scan_min(self, value):
        """Stat scan min setter"""
        self._scan_min = value

    @scan_max.setter
    def scan_max(self, value):
        """Stat scan max setter"""
        self._scan_max = value

    @property
    def scan_n_sigma(self):
        """Stat scan n sigma"""
        return self._scan_n_sigma

    @scan_n_sigma.setter
    def scan_n_sigma(self, n_sigma):
        """Stat scan n sigma"""
        self._scan_n_sigma = int(n_sigma)

    @property
    def scan_values(self):
        """Stat scan values (`~numpy.ndarray`)"""
        if self._scan_values is None:
            scale = interpolation_scale(self.interp)
            parmin, parmax = scale([self.scan_min, self.scan_max])
            values = np.linspace(parmin, parmax, self.scan_n_values)
            return scale.inverse(values)

        return self._scan_values

    @scan_values.setter
    def scan_values(self, values):
        """Set scan values"""
        self._scan_values = values

    def check_limits(self):
        """Emit a warning or error if value is outside the min/max range"""
        if not self.frozen:
            if (~np.isnan(self.min) and (self.value <= self.min)) or (
                ~np.isnan(self.max) and (self.value >= self.max)
            ):
                log.warning(
                    f"Value {self.value} is outside bounds [{self.min}, {self.max}]"
                    f" for parameter '{self.name}'"
                )

    def __repr__(self):
        return (
            f"{self.__class__.__name__}(name={self.name!r}, value={self.value!r}, "
            f"factor={self.factor!r}, scale={self.scale!r}, unit={self.unit!r}, "
            f"min={self.min!r}, max={self.max!r}, frozen={self.frozen!r}, id={hex(id(self))})"
        )

    def copy(self):
        """A deep copy"""
        return copy.deepcopy(self)

    def update_from_dict(self, data):
        """Update parameters from a dict.
        Protection against changing parameter model, type, name."""
        keys = ["value", "unit", "min", "max", "frozen"]
        for k in keys:
            setattr(self, k, data[k])

    def to_dict(self):
        """Convert to dict."""
        output = {
            "name": self.name,
            "value": self.value,
            "unit": self.unit.to_string("fits"),
            "error": self.error,
            "min": self.min,
            "max": self.max,
            "frozen": self.frozen,
            "interp": self.interp,
            "scale_method": self.scale_method,
            "is_norm": self.is_norm,
        }

        if self._link_label_io is not None:
            output["link"] = self._link_label_io

        return output

    def autoscale(self):
        """Autoscale the parameters.

        Set ``factor`` and ``scale`` according to ``scale_method`` attribute

        Available ``scale_method``

        * ``scale10`` sets ``scale`` to power of 10,
          so that abs(factor) is in the range 1 to 10
        * ``factor1`` sets ``factor, scale = 1, value``

        In both cases the sign of value is stored in ``factor``,
        i.e. the ``scale`` is always positive.
        If ``scale_method`` is None the scaling is ignored.

        """
        if self.scale_method == "scale10":
            value = self.value
            if value != 0:
                exponent = np.floor(np.log10(np.abs(value)))
                scale = np.power(10.0, exponent)
                self.factor = value / scale
                self.scale = scale

        elif self.scale_method == "factor1":
            self.factor, self.scale = 1, self.value


[docs]class Parameters(collections.abc.Sequence): """Parameters container. - List of `Parameter` objects. - Covariance matrix. Parameters ---------- parameters : list of `Parameter` List of parameters """ def __init__(self, parameters=None): if parameters is None: parameters = [] else: parameters = list(parameters) self._parameters = parameters
[docs] def check_limits(self): """Check parameter limits and emit a warning""" for par in self: par.check_limits()
@property def types(self): """Parameter types""" return [par.type for par in self] @property def min(self): """Parameter mins (`numpy.ndarray`).""" return np.array([_.min for _ in self._parameters], dtype=np.float64) @min.setter def min(self, min_array): """Parameter minima (`numpy.ndarray`).""" if not len(self) == len(min_array): raise ValueError("Minima must have same length as parameter list") for min_, par in zip(min_array, self): par.min = min_ @property def max(self): """Parameter maxima (`numpy.ndarray`).""" return np.array([_.max for _ in self._parameters], dtype=np.float64) @max.setter def max(self, max_array): """Parameter maxima (`numpy.ndarray`).""" if not len(self) == len(max_array): raise ValueError("Maxima must have same length as parameter list") for max_, par in zip(max_array, self): par.max = max_ @property def value(self): """Parameter values (`numpy.ndarray`).""" return np.array([_.value for _ in self._parameters], dtype=np.float64) @value.setter def value(self, values): """Parameter values (`numpy.ndarray`).""" if not len(self) == len(values): raise ValueError("Values must have same length as parameter list") for value, par in zip(values, self): par.value = value
[docs] @classmethod def from_stack(cls, parameters_list): """Create `Parameters` by stacking a list of other `Parameters` objects. Parameters ---------- parameters_list : list of `Parameters` List of `Parameters` objects """ pars = itertools.chain(*parameters_list) return cls(pars)
[docs] def copy(self): """A deep copy""" return copy.deepcopy(self)
@property def free_parameters(self): """List of free parameters""" return self.__class__([par for par in self._parameters if not par.frozen]) @property def unique_parameters(self): """Unique parameters (`Parameters`).""" return self.__class__(dict.fromkeys(self._parameters)) @property def names(self): """List of parameter names""" return [par.name for par in self._parameters]
[docs] def index(self, val): """Get position index for a given parameter. The input can be a parameter object, parameter name (str) or if a parameter index (int) is passed in, it is simply returned. """ if isinstance(val, int): return val elif isinstance(val, Parameter): return self._parameters.index(val) elif isinstance(val, str): for idx, par in enumerate(self._parameters): if val == par.name: return idx raise IndexError(f"No parameter: {val!r}") else: raise TypeError(f"Invalid type: {type(val)!r}")
def __getitem__(self, key): """Access parameter by name, index or boolean mask""" if isinstance(key, np.ndarray) and key.dtype == bool: return self.__class__(list(np.array(self._parameters)[key])) else: idx = self.index(key) return self._parameters[idx] def __len__(self): return len(self._parameters) def __add__(self, other): if isinstance(other, Parameters): return Parameters.from_stack([self, other]) else: raise TypeError(f"Invalid type: {other!r}")
[docs] def to_dict(self): data = [] for par in self._parameters: data.append(par.to_dict()) return data
[docs] def to_table(self): """Convert parameter attributes to `~astropy.table.Table`.""" rows = [] for p in self._parameters: d = p.to_dict() if "link" not in d: d["link"] = "" for key in ["scale_method", "interp"]: if key in d: del d[key] rows.append({**dict(type=p.type), **d}) table = table_from_row_data(rows) table["value"].format = ".4e" for name in ["error", "min", "max"]: table[name].format = ".3e" return table
def __eq__(self, other): all_equal = np.all([p is p_new for p, p_new in zip(self, other)]) return all_equal and len(self) == len(other)
[docs] @classmethod def from_dict(cls, data): parameters = [] for par in data: link_label = par.pop("link", None) parameter = Parameter(**par) parameter._link_label_io = link_label parameters.append(parameter) return cls(parameters=parameters)
[docs] def set_parameter_factors(self, factors): """Set factor of all parameters. Used in the optimizer interface. """ idx = 0 for parameter in self._parameters: if not parameter.frozen: parameter.factor = factors[idx] idx += 1
[docs] def autoscale(self): """Autoscale all parameters. See :func:`~gammapy.modeling.Parameter.autoscale` """ for par in self._parameters: par.autoscale()
[docs] def select( self, name=None, type=None, frozen=None, ): """Create a mask of models, true if all conditions are verified Parameters ---------- name : str or list Name of the parameter type : {None, spatial, spectral, temporal} type of models frozen : bool Select frozen parameters if True, exclude them if False. Returns ------- parameters : `Parameters` Selected parameters """ selection = np.ones(len(self), dtype=bool) if name and not isinstance(name, list): name = [name] for idx, par in enumerate(self): if name: selection[idx] &= np.any([_ == par.name for _ in name]) if type: selection[idx] &= type == par.type if frozen is not None: if frozen: selection[idx] &= par.frozen else: selection[idx] &= ~par.frozen return self[selection]
[docs] def freeze_all(self): """Freeze all parameters""" for par in self._parameters: par.frozen = True
[docs] def unfreeze_all(self): """Unfreeze all parameters (even those frozen by default)""" for par in self._parameters: par.frozen = False
[docs] def restore_status(self, restore_values=True): """Context manager to restore status. A copy of the values is made on enter, and those values are restored on exit. Parameters ---------- restore_values : bool Restore values if True, otherwise restore only frozen status. Examples -------- :: from gammapy.modeling.models import PowerLawSpectralModel pwl = PowerLawSpectralModel(index=2) with pwl.parameters.restore_status(): pwl.parameters["index"].value = 3 print(pwl.parameters["index"].value) """ return restore_parameters_status(self, restore_values)
class restore_parameters_status: def __init__(self, parameters, restore_values=True): self.restore_values = restore_values self._parameters = parameters self.values = [_.value for _ in parameters] self.frozen = [_.frozen for _ in parameters] def __enter__(self): pass def __exit__(self, type, value, traceback): for value, par, frozen in zip(self.values, self._parameters, self.frozen): if self.restore_values: par.value = value par.frozen = frozen