Source code for gammapy.irf.psf_table

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
import numpy as np
import scipy.integrate
from astropy import units as u
from astropy.coordinates import Angle
from astropy.io import fits
from astropy.table import Table
from astropy.utils import lazyproperty
from gammapy.maps import MapAxis
from gammapy.utils.array import array_stats_str
from gammapy.utils.gauss import Gauss2DPDF
from gammapy.utils.interpolation import ScaledRegularGridInterpolator
from gammapy.utils.scripts import make_path

__all__ = ["TablePSF", "EnergyDependentTablePSF"]

log = logging.getLogger(__name__)


[docs]class TablePSF: """Radially-symmetric table PSF. Parameters ---------- rad_axis : `~astropy.units.Quantity` with angle units Offset wrt source position psf_value : `~astropy.units.Quantity` with sr^-1 units PSF value array interp_kwargs : dict Keyword arguments passed to `ScaledRegularGridInterpolator` """ def __init__(self, rad_axis, psf_value, interp_kwargs=None): assert rad_axis.name == "rad" self._rad_axis = rad_axis self.psf_value = u.Quantity(psf_value).to("sr^-1") self._interp_kwargs = interp_kwargs or {} @property def rad_axis(self): return self._rad_axis @lazyproperty def _interpolate(self): points = (self.rad_axis.center,) return ScaledRegularGridInterpolator( points=points, values=self.psf_value, **self._interp_kwargs ) @lazyproperty def _interpolate_containment(self): rad = self.rad_axis.center if rad[0] > 0: rad = rad.insert(0, 0) rad_drad = 2 * np.pi * rad * self.evaluate(rad) values = scipy.integrate.cumtrapz( rad_drad.to_value("rad-1"), rad.to_value("rad"), initial=0 ) return ScaledRegularGridInterpolator(points=(rad,), values=values, fill_value=1)
[docs] @classmethod def from_shape(cls, shape, width, rad): """Make TablePSF objects with commonly used shapes. This function is mostly useful for examples and testing. Parameters ---------- shape : {'disk', 'gauss'} PSF shape. width : `~astropy.units.Quantity` with angle units PSF width angle (radius for disk, sigma for Gauss). rad : `~astropy.units.Quantity` with angle units Offset angle Returns ------- psf : `TablePSF` Table PSF Examples -------- >>> import numpy as np >>> from astropy.coordinates import Angle >>> from gammapy.irf import TablePSF >>> rad = Angle(np.linspace(0, 0.7, 100), 'deg') >>> psf = TablePSF.from_shape(shape='gauss', width='0.2 deg', rad=rad) """ width = Angle(width) rad = Angle(rad) if shape == "disk": amplitude = 1 / (np.pi * width.radian ** 2) psf_value = np.where(rad < width, amplitude, 0) elif shape == "gauss": gauss2d_pdf = Gauss2DPDF(sigma=width.radian) psf_value = gauss2d_pdf(rad.radian) else: raise ValueError(f"Invalid shape: {shape}") psf_value = u.Quantity(psf_value, "sr^-1") rad_axis = MapAxis.from_nodes(rad, name="rad") return cls(rad_axis=rad_axis, psf_value=psf_value)
[docs] def info(self): """Print basic info.""" ss = array_stats_str(self.rad_axis.center, "offset") ss += f"integral = {self.containment(self.rad_axis.edges[-1])}\n" for containment in [68, 80, 95]: radius = self.containment_radius(0.01 * containment) ss += f"containment radius {radius.deg} deg for {containment}%\n" return ss
[docs] def evaluate(self, rad): r"""Evaluate PSF. The following PSF quantities are available: * 'dp_domega': PDF per 2-dim solid angle :math:`\Omega` in sr^-1 .. math:: \frac{dP}{d\Omega} Parameters ---------- rad : `~astropy.coordinates.Angle` Offset wrt source position Returns ------- psf_value : `~astropy.units.Quantity` PSF value """ rad = np.atleast_1d(u.Quantity(rad)) return self._interpolate((rad,))
[docs] def containment(self, rad_max): """Compute PSF containment fraction. Parameters ---------- rad_max : `~astropy.units.Quantity` Offset angle range Returns ------- integral : float PSF integral """ rad = np.atleast_1d(rad_max) return self._interpolate_containment((rad,))
[docs] def containment_radius(self, fraction): """Containment radius. Parameters ---------- fraction : array_like Containment fraction (range 0 .. 1) Returns ------- rad : `~astropy.coordinates.Angle` Containment radius angle """ # TODO: check whether starting rad_max = Angle( np.linspace(0 * u.deg, self.rad_axis.center[-1], 10 * self.rad_axis.nbin), "rad", ) containment = self.containment(rad_max=rad_max) fraction = np.atleast_1d(fraction) fraction_idx = np.argmin(np.abs(containment - fraction[:, np.newaxis]), axis=1) return rad_max[fraction_idx].to("deg")
[docs] def normalize(self): """Normalize PSF to unit integral. Computes the total PSF integral via the :math:`dP / dr` spline and then divides the :math:`dP / dr` array. """ integral = self.containment(self.rad_axis.edges[-1]) self.psf_value /= integral
[docs] def plot_psf_vs_rad(self, ax=None, **kwargs): """Plot PSF vs radius. Parameters ---------- ax : `` kwargs : dict Keyword arguments passed to `matplotlib.pyplot.plot` """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax ax.plot( self.rad_axis.center.to_value("deg"), self.psf_value.to_value("sr-1"), **kwargs, ) ax.set_yscale("log") ax.set_xlabel("Radius (deg)") ax.set_ylabel("PSF (sr-1)")
[docs]class EnergyDependentTablePSF: """Energy-dependent radially-symmetric table PSF (``gtpsf`` format). TODO: add references and explanations. Parameters ---------- energy_axis_true : `MapAxis` Energy axis rad_axis : `MapAxis` Offset angle wrt source position axis exposure : `~astropy.units.Quantity` Exposure (1-dim) psf_value : `~astropy.units.Quantity` PSF (2-dim with axes: psf[energy_index, offset_index] interp_kwargs : dict Interpolation keyword arguments pass to `ScaledRegularGridInterpolator`. """ def __init__( self, energy_axis_true, rad_axis, exposure=None, psf_value=None, interp_kwargs=None, ): self._rad_axis = rad_axis self._energy_axis_true = energy_axis_true assert energy_axis_true.name == "energy_true" assert rad_axis.name == "rad" if exposure is None: self.exposure = u.Quantity(np.ones(self.energy_axis_true.nbin), "cm^2 s") else: self.exposure = u.Quantity(exposure).to("cm^2 s") shape = (energy_axis_true.nbin, rad_axis.nbin) if psf_value is None: self.psf_value = np.zeros(shape) * u.Unit("sr^-1") else: assert np.shape(psf_value) == shape self.psf_value = u.Quantity(psf_value).to("sr^-1") self._interp_kwargs = interp_kwargs or {} @property def energy_axis_true(self): return self._energy_axis_true @property def rad_axis(self): return self._rad_axis @lazyproperty def _interpolate(self): points = (self.energy_axis_true.center, self.rad_axis.center) return ScaledRegularGridInterpolator( points=points, values=self.psf_value, **self._interp_kwargs ) @lazyproperty def _interpolate_containment(self): rad = self.rad_axis.center if rad[0] > 0: rad = rad.insert(0, 0) rad_drad = ( 2 * np.pi * rad * self.evaluate(energy=self.energy_axis_true.center, rad=rad) ) values = scipy.integrate.cumtrapz( rad_drad.to_value("rad-1"), rad.to_value("rad"), initial=0, axis=1 ) points = (self.energy_axis_true.center, rad) return ScaledRegularGridInterpolator(points=points, values=values, fill_value=1) def __str__(self): ss = "EnergyDependentTablePSF\n" ss += "-----------------------\n" ss += "\nAxis info:\n" ss += " " + array_stats_str(self.rad_axis.center.to("deg"), "rad") ss += " " + array_stats_str(self.energy_axis_true.center, "energy") ss += "\nContainment info:\n" # Print some example containment radii fractions = [0.68, 0.95] energies = u.Quantity([10, 100], "GeV") for fraction in fractions: rads = self.containment_radius(energy=energies, fraction=fraction) for energy, rad in zip(energies, rads): ss += f" {100 * fraction}% containment radius at {energy:3.0f}: {rad:.2f}\n" return ss
[docs] @classmethod def from_hdulist(cls, hdu_list): """Create `EnergyDependentTablePSF` from ``gtpsf`` format HDU list. Parameters ---------- hdu_list : `~astropy.io.fits.HDUList` HDU list with ``THETA`` and ``PSF`` extensions. """ # TODO: move this to MapAxis.from_table() rad = Angle(hdu_list["THETA"].data["Theta"], "deg") rad_axis = MapAxis.from_nodes(rad, name="rad") energy = u.Quantity(hdu_list["PSF"].data["Energy"], "MeV") energy_axis_true = MapAxis.from_nodes(energy, name="energy_true", interp="log") exposure = u.Quantity(hdu_list["PSF"].data["Exposure"], "cm^2 s") psf_value = u.Quantity(hdu_list["PSF"].data["PSF"], "sr^-1") return cls( energy_axis_true=energy_axis_true, rad_axis=rad_axis, exposure=exposure, psf_value=psf_value, )
[docs] def to_hdulist(self): """Convert to FITS HDU list format. Returns ------- hdu_list : `~astropy.io.fits.HDUList` PSF in HDU list format. """ # TODO: write HEADER keywords as gtpsf data = Table([self.rad_axis.center.to("deg")], names=["Theta"]) theta_hdu = fits.BinTableHDU(data=data, name="THETA") data = Table( [ self.energy_axis_true.center.to("MeV"), self.exposure.to("cm^2 s"), self.psf_value.to("sr^-1"), ], names=["Energy", "Exposure", "PSF"], ) psf_hdu = fits.BinTableHDU(data=data, name="PSF") hdu_list = fits.HDUList([fits.PrimaryHDU(), theta_hdu, psf_hdu]) return hdu_list
[docs] @classmethod def read(cls, filename): """Create `EnergyDependentTablePSF` from ``gtpsf``-format FITS file. Parameters ---------- filename : str File name """ with fits.open(str(make_path(filename)), memmap=False) as hdulist: return cls.from_hdulist(hdulist)
[docs] def write(self, filename, *args, **kwargs): """Write to FITS file. Calls `~astropy.io.fits.HDUList.writeto`, forwarding all arguments. """ self.to_hdulist().writeto(str(make_path(filename)), *args, **kwargs)
[docs] def evaluate(self, energy=None, rad=None, method="linear"): """Evaluate the PSF at a given energy and offset Parameters ---------- energy : `~astropy.units.Quantity` Energy value rad : `~astropy.coordinates.Angle` Offset wrt source position method : {"linear", "nearest"} Linear or nearest neighbour interpolation. Returns ------- values : `~astropy.units.Quantity` Interpolated value """ if energy is None: energy = self.energy_axis_true.center if rad is None: rad = self.rad_axis.center energy = u.Quantity(energy, ndmin=1)[:, np.newaxis] rad = u.Quantity(rad, ndmin=1) return self._interpolate((energy, rad), clip=True, method=method)
[docs] def table_psf_at_energy(self, energy, method="linear", **kwargs): """Create `~gammapy.irf.TablePSF` at one given energy. Parameters ---------- energy : `~astropy.units.Quantity` Energy method : {"linear", "nearest"} Linear or nearest neighbour interpolation. Returns ------- psf : `~gammapy.irf.TablePSF` Table PSF """ psf_value = self.evaluate(energy=energy, method=method)[0, :] return TablePSF(rad_axis=self.rad_axis, psf_value=psf_value, **kwargs)
[docs] def table_psf_in_energy_range( self, energy_range, spectrum=None, n_bins=11, **kwargs ): """Average PSF in a given energy band. Expected counts in sub energy bands given the given exposure and spectrum are used as weights. Parameters ---------- energy_range : `~astropy.units.Quantity` Energy band spectrum : `~gammapy.modeling.models.SpectralModel` Spectral model used for weighting the PSF. Default is a power law with index=2. n_bins : int Number of energy points in the energy band, used to compute the weigthed PSF. Returns ------- psf : `TablePSF` Table PSF """ from gammapy.modeling.models import PowerLawSpectralModel, TemplateSpectralModel if spectrum is None: spectrum = PowerLawSpectralModel() exposure = TemplateSpectralModel(self.energy_axis_true.center, self.exposure) e_min, e_max = energy_range energy = MapAxis.from_energy_bounds(e_min, e_max, n_bins).edges weights = spectrum(energy) * exposure(energy) weights /= weights.sum() psf_value = self.evaluate(energy=energy) psf_value_weighted = weights[:, np.newaxis] * psf_value return TablePSF(self.rad_axis, psf_value_weighted.sum(axis=0), **kwargs)
[docs] def containment_radius(self, energy, fraction=0.68): """Containment radius. Parameters ---------- energy : `~astropy.units.Quantity` Energy fraction : float Containment fraction. Returns ------- rad : `~astropy.units.Quantity` Containment radius in deg """ # upsamle for better precision rad_max = Angle(self.rad_axis.upsample(factor=10).center) containment = self.containment(energy=energy, rad_max=rad_max) # find nearest containment value fraction_idx = np.argmin(np.abs(containment - fraction), axis=1) return rad_max[fraction_idx].to("deg")
[docs] def containment(self, energy, rad_max): """Compute containment of the PSF. Parameters ---------- energy : `~astropy.units.Quantity` Energy rad_max : `~astropy.coordinates.Angle` Maximum offset angle. Returns ------- fraction : array_like Containment fraction (in range 0 .. 1) """ energy = np.atleast_1d(u.Quantity(energy))[:, np.newaxis] rad_max = np.atleast_1d(u.Quantity(rad_max)) return self._interpolate_containment((energy, rad_max))
[docs] def info(self): """Print basic info""" print(str(self))
[docs] def plot_psf_vs_rad(self, energies=None, ax=None, **kwargs): """Plot PSF vs radius. Parameters ---------- energy : `~astropy.units.Quantity` Energies where to plot the PSF. **kwargs : dict Keyword arguments pass to `~matplotlib.pyplot.plot`. """ import matplotlib.pyplot as plt if energies is None: energies = [100, 1000, 10000] * u.GeV ax = plt.gca() if ax is None else ax for energy in energies: psf_value = np.squeeze(self.evaluate(energy=energy)) label = f"{energy:.0f}" ax.plot( self.rad_axis.center.to_value("deg"), psf_value.to_value("sr-1"), label=label, **kwargs, ) ax.set_yscale("log") ax.set_xlabel("Offset (deg)") ax.set_ylabel("PSF (1 / sr)") plt.legend() return ax
[docs] def plot_containment_vs_energy( self, ax=None, fractions=[0.68, 0.8, 0.95], **kwargs ): """Plot containment versus energy.""" import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax for fraction in fractions: rad = self.containment_radius(self.energy_axis_true.center, fraction) label = f"{100 * fraction:.1f}% Containment" ax.plot( self.energy_axis_true.center.to("GeV").value, rad.to("deg").value, label=label, **kwargs, ) ax.semilogx() ax.legend(loc="best") ax.set_xlabel("Energy (GeV)") ax.set_ylabel("Containment radius (deg)")
[docs] def plot_exposure_vs_energy(self): """Plot exposure versus energy.""" import matplotlib.pyplot as plt plt.figure(figsize=(4, 3)) plt.plot(self.energy_axis_true.center, self.exposure, color="black", lw=3) plt.semilogx() plt.xlabel("Energy (MeV)") plt.ylabel("Exposure (cm^2 s)") plt.xlim(1e4 / 1.3, 1.3 * 1e6) plt.ylim(0, 1.5e11) plt.tight_layout()
[docs] def stack(self, psf): """Stack two EnergyDependentTablePSF objects.s Parameters ---------- psf : `EnergyDependentTablePSF` PSF to stack. Returns ------- stacked_psf : `EnergyDependentTablePSF` Stacked PSF. """ exposure = self.exposure + psf.exposure psf_value = self.psf_value.T * self.exposure + psf.psf_value.T * psf.exposure with np.errstate(invalid="ignore"): # exposure can be zero psf_value = np.nan_to_num(psf_value / exposure) return self.__class__( energy_axis_true=self.energy_axis_true, rad_axis=self.rad_axis, psf_value=psf_value.T, exposure=exposure, )