Source code for gammapy.irf.psf.parametric

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import abc
import logging
import numpy as np
from astropy import units as u
from gammapy.maps import MapAxes, MapAxis
from gammapy.utils.gauss import MultiGauss2D
from gammapy.utils.interpolation import ScaledRegularGridInterpolator
from .core import PSF

__all__ = ["ParametricPSF", "EnergyDependentMultiGaussPSF", "PSFKing"]

log = logging.getLogger(__name__)


[docs] class ParametricPSF(PSF): """Parametric PSF base class. Parameters ---------- axes : list of `MapAxis` or `MapAxes` Axes. data : dict of `~numpy.ndarray` or `~numpy.recarray` Data. unit : dict of str or `~astropy.units.Unit` Unit. meta : dict Metadata dictionary. """ @property @abc.abstractmethod def required_parameters(self): return []
[docs] @abc.abstractmethod def evaluate_direct(self, rad, **kwargs): pass
[docs] @abc.abstractmethod def evaluate_containment(self, rad, **kwargs): pass
[docs] def normalize(self): """Normalize parametric PSF.""" raise NotImplementedError
@property def quantity(self): """Quantity.""" quantity = {} for name in self.required_parameters: quantity[name] = self.data[name] * self.unit[name] return quantity @property def unit(self): """Map unit as a `~astropy.units.Unit`.""" return self._unit
[docs] def to_unit(self, unit): """Convert IRF to unit.""" raise NotImplementedError
@property def _interpolators(self): interps = {} for name in self.required_parameters: points = [a.center for a in self.axes] points_scale = tuple([a.interp for a in self.axes]) interps[name] = ScaledRegularGridInterpolator( points, values=self.quantity[name], points_scale=points_scale ) return interps
[docs] def evaluate_parameters(self, energy_true, offset): """Evaluate analytic PSF parameters at a given energy and offset. Uses nearest-neighbor interpolation. Parameters ---------- energy_true : `~astropy.units.Quantity` Energy value. offset : `~astropy.coordinates.Angle` Offset in the field of view. Returns ------- values : `~astropy.units.Quantity` Interpolated value. """ pars = {} for name in self.required_parameters: value = self._interpolators[name]((energy_true, offset)) pars[name] = value return pars
[docs] def to_table(self, format="gadf-dl3"): """Convert PSF table data to table. Parameters ---------- format : {"gadf-dl3"} Format specification. Default is "gadf-dl3". Returns ------- hdu_list : `~astropy.io.fits.HDUList` PSF in HDU list format. """ from gammapy.irf.io import IRF_DL3_HDU_SPECIFICATION table = self.axes.to_table(format="gadf-dl3") spec = IRF_DL3_HDU_SPECIFICATION[self.tag]["column_name"] for name in self.required_parameters: column_name = spec[name] table[column_name] = self.data[name].T[np.newaxis] table[column_name].unit = self.unit[name] # Create hdu and hdu list return table
[docs] @classmethod def from_table(cls, table, format="gadf-dl3"): """Create parametric PSF from `~astropy.table.Table`. Parameters ---------- table : `~astropy.table.Table` Table information. format : {"gadf-dl3"}, optional Format specification. Default is "gadf-dl3". Returns ------- psf : `~ParametricPSF` PSF class. """ from gammapy.irf.io import IRF_DL3_HDU_SPECIFICATION axes = MapAxes.from_table(table, format=format)[cls.required_axes] dtype = { "names": cls.required_parameters, "formats": len(cls.required_parameters) * (np.float32,), } data = np.empty(axes.shape, dtype=dtype) unit = {} spec = IRF_DL3_HDU_SPECIFICATION[cls.tag]["column_name"] for name in cls.required_parameters: column = table[spec[name]] values = column.data[0].transpose() # This fixes some files where sigma is written as zero if "sigma" in name: values[values == 0] = 1.0 data[name] = values.reshape(axes.shape) unit[name] = column.unit or "" unit = {key: u.Unit(val) for key, val in unit.items()} return cls(axes=axes, data=data, meta=table.meta.copy(), unit=unit)
[docs] def to_psf3d(self, rad=None): """Create a PSF3D from a parametric PSF. It will be defined on the same energy and offset values than the input PSF. Parameters ---------- rad : `~astropy.units.Quantity` Rad values. Returns ------- psf3d : `~gammapy.irf.PSF3D` 3D PSF. """ from gammapy.datasets.map import RAD_AXIS_DEFAULT from gammapy.irf import PSF3D offset_axis = self.axes["offset"] energy_axis_true = self.axes["energy_true"] if rad is None: rad_axis = RAD_AXIS_DEFAULT else: rad_axis = MapAxis.from_edges(rad, name="rad") axes = MapAxes([energy_axis_true, offset_axis, rad_axis]) data = self.evaluate(**axes.get_coord()) return PSF3D(axes=axes, data=data.value, unit=data.unit, meta=self.meta.copy())
def __str__(self): str_ = f"{self.__class__.__name__}\n" str_ += "-" * len(self.__class__.__name__) + "\n\n" str_ += f"\taxes : {self.axes.names}\n" str_ += f"\tshape : {self.data.shape}\n" str_ += f"\tndim : {len(self.axes)}\n" str_ += f"\tparameters: {self.required_parameters}\n" return str_.expandtabs(tabsize=2)
[docs] def containment(self, rad, **kwargs): """Containment of the PSF at given axes coordinates. Parameters ---------- rad : `~astropy.units.Quantity` Rad value. **kwargs : dict Other coordinates. Returns ------- containment : `~numpy.ndarray` Containment. """ pars = self.evaluate_parameters(**kwargs) containment = self.evaluate_containment(rad=rad, **pars) return containment
[docs] def evaluate(self, rad, **kwargs): """Evaluate the PSF model. Parameters ---------- rad : `~astropy.coordinates.Angle` Offset from PSF center used for evaluating the PSF on a grid. **kwargs : dict Other coordinates. Returns ------- psf_value : `~astropy.units.Quantity` PSF value. """ pars = self.evaluate_parameters(**kwargs) value = self.evaluate_direct(rad=rad, **pars) return value
[docs] def is_allclose(self, other, rtol_axes=1e-3, atol_axes=1e-6, **kwargs): """Compare two data IRFs for equivalency. Parameters ---------- other : `gammapy.irfs.ParametricPSF` The PSF to compare against. rtol_axes : float, optional Relative tolerance for the axis comparison. Default is 1e-3. atol_axes : float, optional Relative tolerance for the axis comparison. Default is 1e-6. **kwargs : dict Keywords passed to `numpy.allclose`. Returns ------- is_allclose : bool Whether the IRF is all close. """ if not isinstance(other, self.__class__): return TypeError(f"Cannot compare {type(self)} and {type(other)}") data_eq = True for key in self.quantity.keys(): if self.quantity[key].shape != other.quantity[key].shape: return False data_eq &= np.allclose(self.quantity[key], other.quantity[key], **kwargs) axes_eq = self.axes.is_allclose(other.axes, rtol=rtol_axes, atol=atol_axes) return axes_eq and data_eq
def get_sigmas_and_norms(**kwargs): """Convert scale and amplitude to norms.""" sigmas = u.Quantity([kwargs[f"sigma_{idx}"] for idx in [1, 2, 3]]) scale = kwargs["scale"] ones = np.ones(scale.shape) amplitudes = u.Quantity([ones, kwargs["ampl_2"], kwargs["ampl_3"]]) norms = 2 * scale * amplitudes * sigmas**2 return sigmas, norms
[docs] class EnergyDependentMultiGaussPSF(ParametricPSF): """Triple Gauss analytical PSF depending on true energy and offset. Parameters ---------- axes : list of `~gammapy.maps.MapAxis` or `~gammapy.maps.MapAxes` Required axes (in the given order) are: * energy_true (true energy axis) * migra_axis (energy migration axis) * offset_axis (field of view offset axis) data : `~numpy.recarray` Data array. meta : dict Metadata dictionary. Examples -------- Plot R68 of the PSF vs. offset and true energy: .. plot:: :include-source: import matplotlib.pyplot as plt from gammapy.irf import EnergyDependentMultiGaussPSF filename = '$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits' psf = EnergyDependentMultiGaussPSF.read(filename, hdu='POINT SPREAD FUNCTION') psf.plot_containment_radius(fraction=0.68) plt.show() """ tag = "psf_3gauss" required_axes = ["energy_true", "offset"] required_parameters = ["sigma_1", "sigma_2", "sigma_3", "scale", "ampl_2", "ampl_3"]
[docs] @staticmethod def evaluate_containment(rad, **kwargs): """Containment of the PSF at given axes coordinates. Parameters ---------- rad : `~astropy.units.Quantity` Rad value. **kwargs : dict Parameters, see `required_parameters`. Returns ------- containment : `~numpy.ndarray` Containment. """ sigmas, norms = get_sigmas_and_norms(**kwargs) m = MultiGauss2D(sigmas=sigmas, norms=norms) m.normalize() containment = m.containment_fraction(rad) return containment
[docs] @staticmethod def evaluate_direct(rad, **kwargs): """Evaluate PSF model. Parameters ---------- rad : `~astropy.units.Quantity` Rad value. **kwargs : dict Parameters, see `required_parameters`. Returns ------- value : `~numpy.ndarray` PSF value. """ sigmas, norms = get_sigmas_and_norms(**kwargs) m = MultiGauss2D(sigmas=sigmas, norms=norms) m.normalize() return m(rad)
[docs] class PSFKing(ParametricPSF): """King profile analytical PSF depending on energy and offset. This PSF parametrisation and FITS data format is described here: :ref:`gadf:psf_king`. Parameters ---------- axes : list of `MapAxis` or `MapAxes` Data axes, required are ["energy_true", "offset"]. meta : dict Metadata dictionary. """ tag = "psf_king" required_axes = ["energy_true", "offset"] required_parameters = ["gamma", "sigma"] default_interp_kwargs = dict(bounds_error=False, fill_value=None)
[docs] @staticmethod def evaluate_containment(rad, gamma, sigma): """Containment of the PSF at given axes coordinates. Parameters ---------- rad : `~astropy.units.Quantity` Rad value. gamma : `~astropy.units.Quantity` Gamma parameter. sigma : `~astropy.units.Quantity` Sigma parameter. Returns ------- containment : `~numpy.ndarray` Containment. """ with np.errstate(divide="ignore", invalid="ignore"): powterm = 1 - gamma term = (1 + rad**2 / (2 * gamma * sigma**2)) ** powterm containment = 1 - term return containment
[docs] @staticmethod def evaluate_direct(rad, gamma, sigma): """Evaluate the PSF model. Formula is given here: :ref:`gadf:psf_king`. Parameters ---------- rad : `~astropy.coordinates.Angle` Offset from PSF center used for evaluating the PSF on a grid. Returns ------- psf_value : `~astropy.units.Quantity` PSF value. """ with np.errstate(divide="ignore"): term1 = 1 / (2 * np.pi * sigma**2) term2 = 1 - 1 / gamma term3 = (1 + rad**2 / (2 * gamma * sigma**2)) ** (-gamma) return term1 * term2 * term3