Source code for gammapy.irf.psf_table

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import numpy as np
from astropy.io import fits
from astropy.units import Quantity
from astropy.coordinates import Angle, SkyCoord
from ..utils.gauss import Gauss2DPDF
from ..utils.scripts import make_path
from ..utils.array import array_stats_str
from ..utils.energy import Energy

__all__ = ["TablePSF", "EnergyDependentTablePSF"]

log = logging.getLogger(__name__)

# Default PSF spline keyword arguments
# TODO: test and document
DEFAULT_PSF_SPLINE_KWARGS = dict(k=1, s=0)


[docs]class TablePSF(object): r"""Radially-symmetric table PSF. This PSF represents a :math:`PSF(r)=dP / d\Omega(r)` spline interpolation curve for a given set of offset :math:`r` and :math:`PSF` points. Uses `scipy.interpolate.UnivariateSpline`. Parameters ---------- rad : `~astropy.units.Quantity` with angle units Offset wrt source position dp_domega : `~astropy.units.Quantity` with sr^-1 units PSF value array spline_kwargs : dict Keyword arguments passed to `~scipy.interpolate.UnivariateSpline` Notes ----- * This PSF class works well for model PSFs of arbitrary shape (represented by a table), but might give unstable results if the PSF has noise. E.g. if ``dp_domega`` was estimated from histograms of real or simulated event data with finite statistics, it will have noise and it is your responsibility to check that the interpolating spline is reasonable. * To customize the spline, pass keyword arguments to `~scipy.interpolate.UnivariateSpline` in ``spline_kwargs``. E.g. passing ``dict(k=1)`` changes from the default cubic to linear interpolation. * TODO: evaluate spline for ``(log(rad), log(PSF))`` for numerical stability? * TODO: merge morphology.theta class functionality with this class. * TODO: add FITS I/O methods * TODO: add ``normalize`` argument to ``__init__`` with default ``True``? * TODO: ``__call__`` doesn't show up in the html API docs, but it should: https://github.com/astropy/astropy/pull/2135 """ def __init__(self, rad, dp_domega, spline_kwargs=DEFAULT_PSF_SPLINE_KWARGS): self._rad = Angle(rad).to("radian") self._dp_domega = Quantity(dp_domega).to("sr^-1") assert self._rad.ndim == self._dp_domega.ndim == 1 assert self._rad.shape == self._dp_domega.shape # Store input arrays as quantities in default internal units self._dp_dr = (2 * np.pi * self._rad * self._dp_domega).to("radian^-1") self._spline_kwargs = spline_kwargs self._compute_splines(spline_kwargs)
[docs] @classmethod def from_shape(cls, shape, width, rad): """Make TablePSF objects with commonly used shapes. This function is mostly useful for examples and testing. Parameters ---------- shape : {'disk', 'gauss'} PSF shape. width : `~astropy.units.Quantity` with angle units PSF width angle (radius for disk, sigma for Gauss). rad : `~astropy.units.Quantity` with angle units Offset angle Returns ------- psf : `TablePSF` Table PSF Examples -------- >>> import numpy as np >>> from astropy.coordinates import Angle >>> from gammapy.irf import TablePSF >>> TablePSF.from_shape(shape='gauss', width='0.2 deg', ... rad=Angle(np.linspace(0, 0.7, 100), 'deg')) """ width = Angle(width) rad = Angle(rad) if shape == "disk": amplitude = 1 / (np.pi * width.radian ** 2) psf_value = np.where(rad < width, amplitude, 0) elif shape == "gauss": gauss2d_pdf = Gauss2DPDF(sigma=width.radian) psf_value = gauss2d_pdf(rad.radian) else: raise ValueError("Invalid shape: {}".format(shape)) psf_value = Quantity(psf_value, "sr^-1") return cls(rad, psf_value)
[docs] def info(self): """Print basic info.""" ss = array_stats_str(self._rad.degree, "offset") ss += "integral = {}\n".format(self.integral()) for containment in [50, 68, 80, 95]: radius = self.containment_radius(0.01 * containment) ss += "containment radius {} deg for {}%\n".format( radius.degree, containment ) return ss
# TODO: remove because it's not flexible enough?
[docs] def __call__(self, lon, lat): """Evaluate PSF at a 2D position. The PSF is centered on ``(0, 0)``. Parameters ---------- lon, lat : `~astropy.coordinates.Angle` Longitude / latitude position Returns ------- psf_value : `~astropy.units.Quantity` PSF value """ center = SkyCoord(0, 0, unit="radian") point = SkyCoord(lon, lat) rad = center.separation(point) return self.evaluate(rad)
[docs] def evaluate(self, rad, quantity="dp_domega"): r"""Evaluate PSF. The following PSF quantities are available: * 'dp_domega': PDF per 2-dim solid angle :math:`\Omega` in sr^-1 .. math:: \frac{dP}{d\Omega} * 'dp_dr': PDF per 1-dim offset :math:`r` in radian^-1 .. math:: \frac{dP}{dr} = 2 \pi r \frac{dP}{d\Omega} Parameters ---------- rad : `~astropy.coordinates.Angle` Offset wrt source position quantity : {'dp_domega', 'dp_dr'} Which PSF quantity? Returns ------- psf_value : `~astropy.units.Quantity` PSF value """ rad = Angle(rad) shape = rad.shape x = np.array(rad.radian).flat if quantity == "dp_domega": y = self._dp_domega_spline(x) unit = "sr^-1" elif quantity == "dp_dr": y = self._dp_dr_spline(x) unit = "radian^-1" else: ss = "Invalid quantity: {}\n".format(quantity) ss += "Choose one of: 'dp_domega', 'dp_dr'" raise ValueError(ss) y = np.clip(a=y, a_min=0, a_max=None) return Quantity(y, unit).reshape(shape)
[docs] def integral(self, rad_min=None, rad_max=None): """Compute PSF integral, aka containment fraction. Parameters ---------- rad_min, rad_max : `~astropy.units.Quantity` with angle units Offset angle range Returns ------- integral : float PSF integral """ if rad_min is None: rad_min = self._rad[0] else: rad_min = Angle(rad_min) if rad_max is None: rad_max = self._rad[-1] else: rad_max = Angle(rad_max) rad_min = self._rad_clip(rad_min) rad_max = self._rad_clip(rad_max) cdf_min = self._cdf_spline(rad_min) cdf_max = self._cdf_spline(rad_max) return cdf_max - cdf_min
[docs] def containment_radius(self, fraction): """Containment radius. Parameters ---------- fraction : array_like Containment fraction (range 0 .. 1) Returns ------- rad : `~astropy.coordinates.Angle` Containment radius angle """ rad = self._ppf_spline(fraction) return Angle(rad, "radian").to("deg")
[docs] def normalize(self): """Normalize PSF to unit integral. Computes the total PSF integral via the :math:`dP / dr` spline and then divides the :math:`dP / dr` array. """ integral = self.integral() self._dp_dr /= integral # Clip to small positive number to avoid divide by 0 rad = np.clip(self._rad.radian, 1e-6, None) rad = Quantity(rad, "radian") self._dp_domega = self._dp_dr / (2 * np.pi * rad) self._compute_splines(self._spline_kwargs)
[docs] def broaden(self, factor, normalize=True): r"""Broaden PSF by scaling the offset array. For a broadening factor :math:`f` and the offset array :math:`r`, the offset array scaled in the following way: .. math:: r_{new} = f \times r_{old} \frac{dP}{dr}(r_{new}) = \frac{dP}{dr}(r_{old}) Parameters ---------- factor : float Broadening factor normalize : bool Normalize PSF after broadening """ self._rad *= factor # We define broadening such that self._dp_domega remains the same # so we only have to re-compute self._dp_dr and the slines here. self._dp_dr = (2 * np.pi * self._rad * self._dp_domega).to("radian^-1") self._compute_splines(self._spline_kwargs) if normalize: self.normalize()
[docs] def plot_psf_vs_rad(self, ax=None, quantity="dp_domega", **kwargs): """Plot PSF vs radius. TODO: describe PSF ``quantity`` argument in a central place and link to it from here. """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax x = self._rad.to("deg") y = self.evaluate(self._rad, quantity) ax.plot(x.value, y.value, **kwargs) ax.loglog() ax.set_xlabel("Radius ({})".format(x.unit)) ax.set_ylabel("PSF ({})".format(y.unit))
def _compute_splines(self, spline_kwargs=DEFAULT_PSF_SPLINE_KWARGS): """Compute two splines representing the PSF. * `_dp_domega_spline` is used to evaluate the 2D PSF. * `_dp_dr_spline` is not really needed for most applications, but is available via `eval`. * `_cdf_spline` is used to compute integral and for normalisation. * `_ppf_spline` is used to compute containment radii. """ from scipy.interpolate import UnivariateSpline # Compute spline and normalize. x, y = self._rad.value, self._dp_domega.value self._dp_domega_spline = UnivariateSpline(x, y, **spline_kwargs) x, y = self._rad.value, self._dp_dr.value self._dp_dr_spline = UnivariateSpline(x, y, **spline_kwargs) # We use the terminology for scipy.stats distributions # http://docs.scipy.org/doc/scipy/reference/tutorial/stats.html#common-methods # cdf = "cumulative distribution function" self._cdf_spline = self._dp_dr_spline.antiderivative() # ppf = "percent point function" (inverse of cdf) # Here's a discussion on methods to compute the ppf # http://mail.scipy.org/pipermail/scipy-user/2010-May/025237.html y = self._rad.value x = self.integral(Angle(0, "rad"), self._rad) # Since scipy 1.0 the UnivariateSpline requires that x is strictly increasing # So only keep nodes where this is the case (and always keep the first one): x, idx = np.unique(x, return_index=True) y = y[idx] # Dummy values, for cases where one really doesn't have a valid PSF. if len(x) < 4: x = [0, 1, 2, 3] y = [0, 0, 0, 0] self._ppf_spline = UnivariateSpline(x, y, **spline_kwargs) def _rad_clip(self, rad): """Clip to radius support range, because spline extrapolation is unstable.""" rad = Angle(rad, "radian").radian rad = np.clip(rad, 0, self._rad[-1].radian) return rad
[docs]class EnergyDependentTablePSF(object): """Energy-dependent radially-symmetric table PSF (``gtpsf`` format). TODO: add references and explanations. Parameters ---------- energy : `~astropy.units.Quantity` Energy (1-dim) rad : `~astropy.units.Quantity` with angle units Offset angle wrt source position (1-dim) exposure : `~astropy.units.Quantity` Exposure (1-dim) psf_value : `~astropy.units.Quantity` PSF (2-dim with axes: psf[energy_index, offset_index] """ def __init__(self, energy, rad, exposure=None, psf_value=None): self.energy = Quantity(energy).to("GeV") self.rad = Quantity(rad).to("radian") if exposure is None: self.exposure = Quantity(np.ones(len(energy)), "cm^2 s") else: self.exposure = Quantity(exposure).to("cm^2 s") if psf_value is None: self.psf_value = Quantity(np.zeros(len(energy), len(rad)), "sr^-1") else: self.psf_value = Quantity(psf_value).to("sr^-1") # Cache for TablePSF at each energy ... only computed when needed self._table_psf_cache = [None] * len(self.energy) def __str__(self): ss = "EnergyDependentTablePSF\n" ss += "-----------------------\n" ss += "\nAxis info:\n" ss += " " + array_stats_str(self.rad.to("deg"), "rad") ss += " " + array_stats_str(self.energy, "energy") # ss += ' ' + array_stats_str(self.exposure, 'exposure') # ss += 'integral = {}\n'.format(self.integral()) ss += "\nContainment info:\n" # Print some example containment radii fractions = [0.68, 0.95] energies = Quantity([10, 100], "GeV") for fraction in fractions: rads = self.containment_radius(energies=energies, fraction=fraction) for energy, rad in zip(energies, rads): ss += " " + "{}% containment radius at {:3.0f}: {:.2f}\n".format( 100 * fraction, energy, rad ) return ss
[docs] @classmethod def from_fits(cls, hdu_list): """Create `EnergyDependentTablePSF` from ``gtpsf`` format HDU list. Parameters ---------- hdu_list : `~astropy.io.fits.HDUList` HDU list with ``THETA`` and ``PSF`` extensions. """ rad = Angle(hdu_list["THETA"].data["Theta"], "deg") energy = Quantity(hdu_list["PSF"].data["Energy"], "MeV") exposure = Quantity(hdu_list["PSF"].data["Exposure"], "cm^2 s") psf_value = Quantity(hdu_list["PSF"].data["PSF"], "sr^-1") return cls(energy, rad, exposure, psf_value)
[docs] def to_fits(self): """Convert to FITS HDU list format. Returns ------- hdu_list : `~astropy.io.fits.HDUList` PSF in HDU list format. """ # TODO: write HEADER keywords as gtpsf data = self.rad theta_hdu = fits.BinTableHDU(data=data, name="Theta") data = [self.energy, self.exposure, self.psf_value] psf_hdu = fits.BinTableHDU(data=data, name="PSF") hdu_list = fits.HDUList([theta_hdu, psf_hdu]) return hdu_list
[docs] @classmethod def read(cls, filename): """Create `EnergyDependentTablePSF` from ``gtpsf``-format FITS file. Parameters ---------- filename : str File name """ filename = str(make_path(filename)) with fits.open(filename, memmap=False) as hdulist: psf = cls.from_fits(hdulist) return psf
[docs] def write(self, *args, **kwargs): """Write to FITS file. Calls `~astropy.io.fits.HDUList.writeto`, forwarding all arguments. """ self.to_fits().writeto(*args, **kwargs)
[docs] def evaluate(self, energy=None, rad=None, interp_kwargs=None): """Evaluate the PSF at a given energy and offset Parameters ---------- energy : `~astropy.units.Quantity` energy value rad : `~astropy.coordinates.Angle` Offset wrt source position interp_kwargs : dict option for interpolation for `~scipy.interpolate.RegularGridInterpolator` Returns ------- values : `~astropy.units.Quantity` Interpolated value """ if interp_kwargs is None: interp_kwargs = dict(bounds_error=False, fill_value=None) from scipy.interpolate import RegularGridInterpolator if energy is None: energy = self.energy if rad is None: rad = self.rad energy = Energy(energy).to("TeV") rad = Angle(rad).to("deg") energy_bin = self.energy.to("TeV") rad_bin = self.rad.to("deg") points = (energy_bin, rad_bin) interpolator = RegularGridInterpolator( points, self.psf_value.value, **interp_kwargs ) energy_grid, rad_grid = np.meshgrid(energy.value, rad.value, indexing="ij") shape = energy_grid.shape pix_coords = np.column_stack([energy_grid.flat, rad_grid.flat]) data_interp = interpolator(pix_coords) return Quantity(data_interp.reshape(shape), self.psf_value.unit)
[docs] def table_psf_at_energy(self, energy, interp_kwargs=None, **kwargs): """Create `~gammapy.irf.TablePSF` at one given energy. Parameters ---------- energy : `~astropy.units.Quantity` Energy interp_kwargs : dict Option for interpolation for `~scipy.interpolate.RegularGridInterpolator` Returns ------- psf : `~gammapy.irf.TablePSF` Table PSF """ psf_value = self.evaluate(energy, None, interp_kwargs)[0, :] return TablePSF(self.rad, psf_value, **kwargs)
[docs] def table_psf_in_energy_band( self, energy_band, spectral_index=2, spectrum=None, **kwargs ): """Average PSF in a given energy band. Expected counts in sub energy bands given the given exposure and spectrum are used as weights. Parameters ---------- energy_band : `~astropy.units.Quantity` Energy band spectral_index : float Power law spectral index (used if spectrum=None). spectrum : callable Spectrum (callable with energy as parameter). Returns ------- psf : `TablePSF` Table PSF """ if spectrum is None: # This is a false positive error from pylint # See https://github.com/PyCQA/pylint/issues/2410#issuecomment-415026690 def spectrum(energy): # pylint:disable=function-redefined return (energy / energy_band[0]) ** (-spectral_index) # TODO: warn if `energy_band` is outside available data. energy_idx_min, energy_idx_max = self._energy_index(energy_band) # TODO: improve this, probably by evaluating the PSF (i.e. interpolating in energy) onto a new energy grid # This is a bit of a hack, but makes sure that a PSF is given, by forcing at least one slice: if energy_idx_max - energy_idx_min < 2: # log.warning('Dubious case of PSF energy binning') # Note that below always range stop of `energy_idx_max - 1` is used! # That's why we put +2 here to make sure we have at least one bin. energy_idx_max = max(energy_idx_min + 2, energy_idx_max) # Make sure we don't step out of the energy array (doesn't help much) energy_idx_max = min(energy_idx_max, len(self.energy)) # TODO: extract this into a utility function `npred_weighted_mean()` # Compute weights for energy bins weights = np.zeros_like(self.energy.value, dtype=np.float64) for idx in range(energy_idx_min, energy_idx_max - 1): energy_min = self.energy[idx] energy_max = self.energy[idx + 1] exposure = self.exposure[idx] flux = spectrum(energy_min) weights[idx] = (exposure * flux * (energy_max - energy_min)).value # Normalize weights to sum to 1 weights = weights / weights.sum() # Compute weighted PSF value array total_psf_value = np.zeros_like(self._get_1d_psf_values(0), dtype=np.float64) for idx in range(energy_idx_min, energy_idx_max - 1): psf_value = self._get_1d_psf_values(idx) total_psf_value += weights[idx] * psf_value # TODO: add version that returns `total_psf_value` without # making a `TablePSF`. return TablePSF(self.rad, total_psf_value, **kwargs)
[docs] def containment_radius(self, energies, fraction, interp_kwargs=None): """Containment radius. Parameters ---------- energies : `~astropy.units.Quantity` Energy fraction : float Containment fraction in % Returns ------- rad : `~astropy.units.Quantity` Containment radius in deg """ # TODO: figure out if there's a more efficient implementation to support # arrays of energy energies = np.atleast_1d(energies) psfs = [self.table_psf_at_energy(energy, interp_kwargs) for energy in energies] rad = [psf.containment_radius(fraction) for psf in psfs] return Quantity(rad)
[docs] def integral(self, energy, rad_min, rad_max): """Containment fraction. Parameters ---------- energy : `~astropy.units.Quantity` Energy rad_min, rad_max : `~astropy.coordinates.Angle` Offset Returns ------- fraction : array_like Containment fraction (in range 0 .. 1) """ # TODO: useless at the moment ... support array inputs or remove! psf = self.table_psf_at_energy(energy) return psf.integral(rad_min, rad_max)
[docs] def info(self): """Print basic info""" print(str(self))
[docs] def plot_psf_vs_rad(self, energies=[1e4, 1e5, 1e6]): """Plot PSF vs radius. Parameters ---------- TODO """ import matplotlib.pyplot as plt plt.figure(figsize=(6, 4)) for energy in energies: energy_index = self._energy_index(energy) psf = self.psf_value[energy_index, :] label = "{} GeV".format(1e-3 * energy) x = np.hstack([-self.rad[::-1], self.rad]) y = 1e-6 * np.hstack([psf[::-1], psf]) plt.plot(x, y, lw=2, label=label) # plt.semilogy() # plt.loglog() plt.legend() plt.xlim(-0.2, 0.5) plt.xlabel("Offset (deg)") plt.ylabel("PSF (1e-6 sr^-1)") plt.tight_layout()
[docs] def plot_containment_vs_energy( self, ax=None, fractions=[0.63, 0.8, 0.95], **kwargs ): """Plot containment versus energy.""" import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax energy = Energy.equal_log_spacing(self.energy.min(), self.energy.max(), 10) for fraction in fractions: rad = self.containment_radius(energy, fraction) label = "{:.1f}% Containment".format(100 * fraction) ax.plot(energy.value, rad.value, label=label, **kwargs) ax.semilogx() ax.legend(loc="best") ax.set_xlabel("Energy (GeV)") ax.set_ylabel("Containment radius (deg)")
[docs] def plot_exposure_vs_energy(self): """Plot exposure versus energy.""" import matplotlib.pyplot as plt plt.figure(figsize=(4, 3)) plt.plot(self.energy, self.exposure, color="black", lw=3) plt.semilogx() plt.xlabel("Energy (MeV)") plt.ylabel("Exposure (cm^2 s)") plt.xlim(1e4 / 1.3, 1.3 * 1e6) plt.ylim(0, 1.5e11) plt.tight_layout()
def _energy_index(self, energy): """Find energy array index. """ # TODO: test with array input return np.searchsorted(self.energy, energy) def _get_1d_psf_values(self, energy_index): """Get 1-dim PSF value array. Parameters ---------- energy_index : int Energy index Returns ------- psf_values : `~astropy.units.Quantity` PSF value array """ psf_values = self.psf_value[energy_index, :].flatten().copy() # When the PSF Table is not filled (with nan), the psf estimation at a given energy crashes psf_values[np.isnan(psf_values)] = 0 return psf_values def _get_1d_table_psf(self, energy_index, **kwargs): """Get 1-dim TablePSF (cached). Parameters ---------- energy_index : int Energy index Returns ------- table_psf : `TablePSF` Table PSF """ # TODO: support array_like `energy_index` here? if self._table_psf_cache[energy_index] is None: psf_value = self._get_1d_psf_values(energy_index) table_psf = TablePSF(self.rad, psf_value, **kwargs) self._table_psf_cache[energy_index] = table_psf return self._table_psf_cache[energy_index]