# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import numpy as np
from scipy.interpolate import UnivariateSpline, RegularGridInterpolator
from astropy.io import fits
from astropy.units import Quantity
from astropy.coordinates import Angle, SkyCoord
from ..utils.gauss import Gauss2DPDF
from ..utils.scripts import make_path
from ..utils.array import array_stats_str
from ..utils.energy import Energy
__all__ = ["TablePSF", "EnergyDependentTablePSF"]
log = logging.getLogger(__name__)
# Default PSF spline keyword arguments
# TODO: test and document
DEFAULT_PSF_SPLINE_KWARGS = dict(k=1, s=0)
[docs]class TablePSF(object):
r"""Radially-symmetric table PSF.
This PSF represents a :math:`PSF(r)=dP / d\Omega(r)`
spline interpolation curve for a given set of offset :math:`r`
and :math:`PSF` points.
Uses `scipy.interpolate.UnivariateSpline`.
Parameters
----------
rad : `~astropy.units.Quantity` with angle units
Offset wrt source position
dp_domega : `~astropy.units.Quantity` with sr^-1 units
PSF value array
spline_kwargs : dict
Keyword arguments passed to `~scipy.interpolate.UnivariateSpline`
Notes
-----
* This PSF class works well for model PSFs of arbitrary shape (represented by a table),
but might give unstable results if the PSF has noise.
E.g. if ``dp_domega`` was estimated from histograms of real or simulated event data
with finite statistics, it will have noise and it is your responsibility
to check that the interpolating spline is reasonable.
* To customize the spline, pass keyword arguments to `~scipy.interpolate.UnivariateSpline`
in ``spline_kwargs``. E.g. passing ``dict(k=1)`` changes from the default cubic to
linear interpolation.
* TODO: evaluate spline for ``(log(rad), log(PSF))`` for numerical stability?
* TODO: merge morphology.theta class functionality with this class.
* TODO: add FITS I/O methods
* TODO: add ``normalize`` argument to ``__init__`` with default ``True``?
* TODO: ``__call__`` doesn't show up in the html API docs, but it should:
https://github.com/astropy/astropy/pull/2135
"""
def __init__(self, rad, dp_domega, spline_kwargs=DEFAULT_PSF_SPLINE_KWARGS):
self._rad = Angle(rad).to("radian")
self._dp_domega = Quantity(dp_domega).to("sr^-1")
assert self._rad.ndim == self._dp_domega.ndim == 1
assert self._rad.shape == self._dp_domega.shape
# Store input arrays as quantities in default internal units
self._dp_dr = (2 * np.pi * self._rad * self._dp_domega).to("radian^-1")
self._spline_kwargs = spline_kwargs
self._compute_splines(spline_kwargs)
[docs] @classmethod
def from_shape(cls, shape, width, rad):
"""Make TablePSF objects with commonly used shapes.
This function is mostly useful for examples and testing.
Parameters
----------
shape : {'disk', 'gauss'}
PSF shape.
width : `~astropy.units.Quantity` with angle units
PSF width angle (radius for disk, sigma for Gauss).
rad : `~astropy.units.Quantity` with angle units
Offset angle
Returns
-------
psf : `TablePSF`
Table PSF
Examples
--------
>>> import numpy as np
>>> from astropy.coordinates import Angle
>>> from gammapy.irf import TablePSF
>>> TablePSF.from_shape(shape='gauss', width='0.2 deg',
... rad=Angle(np.linspace(0, 0.7, 100), 'deg'))
"""
width = Angle(width)
rad = Angle(rad)
if shape == "disk":
amplitude = 1 / (np.pi * width.radian ** 2)
psf_value = np.where(rad < width, amplitude, 0)
elif shape == "gauss":
gauss2d_pdf = Gauss2DPDF(sigma=width.radian)
psf_value = gauss2d_pdf(rad.radian)
else:
raise ValueError("Invalid shape: {}".format(shape))
psf_value = Quantity(psf_value, "sr^-1")
return cls(rad, psf_value)
[docs] def info(self):
"""Print basic info."""
ss = array_stats_str(self._rad.degree, "offset")
ss += "integral = {}\n".format(self.integral())
for containment in [50, 68, 80, 95]:
radius = self.containment_radius(0.01 * containment)
ss += "containment radius {} deg for {}%\n".format(
radius.degree, containment
)
return ss
# TODO: remove because it's not flexible enough?
[docs] def __call__(self, lon, lat):
"""Evaluate PSF at a 2D position.
The PSF is centered on ``(0, 0)``.
Parameters
----------
lon, lat : `~astropy.coordinates.Angle`
Longitude / latitude position
Returns
-------
psf_value : `~astropy.units.Quantity`
PSF value
"""
center = SkyCoord(0, 0, unit="radian")
point = SkyCoord(lon, lat)
rad = center.separation(point)
return self.evaluate(rad)
[docs] def evaluate(self, rad, quantity="dp_domega"):
r"""Evaluate PSF.
The following PSF quantities are available:
* 'dp_domega': PDF per 2-dim solid angle :math:`\Omega` in sr^-1
.. math:: \frac{dP}{d\Omega}
* 'dp_dr': PDF per 1-dim offset :math:`r` in radian^-1
.. math:: \frac{dP}{dr} = 2 \pi r \frac{dP}{d\Omega}
Parameters
----------
rad : `~astropy.coordinates.Angle`
Offset wrt source position
quantity : {'dp_domega', 'dp_dr'}
Which PSF quantity?
Returns
-------
psf_value : `~astropy.units.Quantity`
PSF value
"""
rad = Angle(rad)
shape = rad.shape
x = np.array(rad.radian).flat
if quantity == "dp_domega":
y = self._dp_domega_spline(x)
unit = "sr^-1"
elif quantity == "dp_dr":
y = self._dp_dr_spline(x)
unit = "radian^-1"
else:
ss = "Invalid quantity: {}\n".format(quantity)
ss += "Choose one of: 'dp_domega', 'dp_dr'"
raise ValueError(ss)
y = np.clip(a=y, a_min=0, a_max=None)
return Quantity(y, unit).reshape(shape)
[docs] def integral(self, rad_min=None, rad_max=None):
"""Compute PSF integral, aka containment fraction.
Parameters
----------
rad_min, rad_max : `~astropy.units.Quantity` with angle units
Offset angle range
Returns
-------
integral : float
PSF integral
"""
if rad_min is None:
rad_min = self._rad[0]
else:
rad_min = Angle(rad_min)
if rad_max is None:
rad_max = self._rad[-1]
else:
rad_max = Angle(rad_max)
rad_min = self._rad_clip(rad_min)
rad_max = self._rad_clip(rad_max)
cdf_min = self._cdf_spline(rad_min)
cdf_max = self._cdf_spline(rad_max)
return cdf_max - cdf_min
[docs] def containment_radius(self, fraction):
"""Containment radius.
Parameters
----------
fraction : array_like
Containment fraction (range 0 .. 1)
Returns
-------
rad : `~astropy.coordinates.Angle`
Containment radius angle
"""
rad = self._ppf_spline(fraction)
return Angle(rad, "radian").to("deg")
[docs] def normalize(self):
"""Normalize PSF to unit integral.
Computes the total PSF integral via the :math:`dP / dr` spline
and then divides the :math:`dP / dr` array.
"""
integral = self.integral()
self._dp_dr /= integral
# Clip to small positive number to avoid divide by 0
rad = np.clip(self._rad.radian, 1e-6, None)
rad = Quantity(rad, "radian")
self._dp_domega = self._dp_dr / (2 * np.pi * rad)
self._compute_splines(self._spline_kwargs)
[docs] def broaden(self, factor, normalize=True):
r"""Broaden PSF by scaling the offset array.
For a broadening factor :math:`f` and the offset
array :math:`r`, the offset array scaled
in the following way:
.. math::
r_{new} = f \times r_{old}
\frac{dP}{dr}(r_{new}) = \frac{dP}{dr}(r_{old})
Parameters
----------
factor : float
Broadening factor
normalize : bool
Normalize PSF after broadening
"""
self._rad *= factor
# We define broadening such that self._dp_domega remains the same
# so we only have to re-compute self._dp_dr and the slines here.
self._dp_dr = (2 * np.pi * self._rad * self._dp_domega).to("radian^-1")
self._compute_splines(self._spline_kwargs)
if normalize:
self.normalize()
[docs] def plot_psf_vs_rad(self, ax=None, quantity="dp_domega", **kwargs):
"""Plot PSF vs radius.
TODO: describe PSF ``quantity`` argument in a central place and link to it from here.
"""
import matplotlib.pyplot as plt
ax = plt.gca() if ax is None else ax
x = self._rad.to("deg")
y = self.evaluate(self._rad, quantity)
ax.plot(x.value, y.value, **kwargs)
ax.loglog()
ax.set_xlabel("Radius ({})".format(x.unit))
ax.set_ylabel("PSF ({})".format(y.unit))
def _compute_splines(self, spline_kwargs=DEFAULT_PSF_SPLINE_KWARGS):
"""Compute two splines representing the PSF.
* `_dp_domega_spline` is used to evaluate the 2D PSF.
* `_dp_dr_spline` is not really needed for most applications,
but is available via `eval`.
* `_cdf_spline` is used to compute integral and for normalisation.
* `_ppf_spline` is used to compute containment radii.
"""
# Compute spline and normalize.
x, y = self._rad.value, self._dp_domega.value
self._dp_domega_spline = UnivariateSpline(x, y, **spline_kwargs)
x, y = self._rad.value, self._dp_dr.value
self._dp_dr_spline = UnivariateSpline(x, y, **spline_kwargs)
# We use the terminology for scipy.stats distributions
# http://docs.scipy.org/doc/scipy/reference/tutorial/stats.html#common-methods
# cdf = "cumulative distribution function"
self._cdf_spline = self._dp_dr_spline.antiderivative()
# ppf = "percent point function" (inverse of cdf)
# Here's a discussion on methods to compute the ppf
# http://mail.scipy.org/pipermail/scipy-user/2010-May/025237.html
y = self._rad.value
x = self.integral(Angle(0, "rad"), self._rad)
# Since scipy 1.0 the UnivariateSpline requires that x is strictly increasing
# So only keep nodes where this is the case (and always keep the first one):
x, idx = np.unique(x, return_index=True)
y = y[idx]
# Dummy values, for cases where one really doesn't have a valid PSF.
if len(x) < 4:
x = [0, 1, 2, 3]
y = [0, 0, 0, 0]
self._ppf_spline = UnivariateSpline(x, y, **spline_kwargs)
def _rad_clip(self, rad):
"""Clip to radius support range, because spline extrapolation is unstable."""
rad = Angle(rad, "radian").radian
rad = np.clip(rad, 0, self._rad[-1].radian)
return rad
[docs]class EnergyDependentTablePSF(object):
"""Energy-dependent radially-symmetric table PSF (``gtpsf`` format).
TODO: add references and explanations.
Parameters
----------
energy : `~astropy.units.Quantity`
Energy (1-dim)
rad : `~astropy.units.Quantity` with angle units
Offset angle wrt source position (1-dim)
exposure : `~astropy.units.Quantity`
Exposure (1-dim)
psf_value : `~astropy.units.Quantity`
PSF (2-dim with axes: psf[energy_index, offset_index]
"""
def __init__(self, energy, rad, exposure=None, psf_value=None):
self.energy = Quantity(energy).to("GeV")
self.rad = Quantity(rad).to("radian")
if exposure is None:
self.exposure = Quantity(np.ones(len(energy)), "cm^2 s")
else:
self.exposure = Quantity(exposure).to("cm^2 s")
if psf_value is None:
self.psf_value = Quantity(np.zeros(len(energy), len(rad)), "sr^-1")
else:
self.psf_value = Quantity(psf_value).to("sr^-1")
# Cache for TablePSF at each energy ... only computed when needed
self._table_psf_cache = [None] * len(self.energy)
def __str__(self):
ss = "EnergyDependentTablePSF\n"
ss += "-----------------------\n"
ss += "\nAxis info:\n"
ss += " " + array_stats_str(self.rad.to("deg"), "rad")
ss += " " + array_stats_str(self.energy, "energy")
# ss += ' ' + array_stats_str(self.exposure, 'exposure')
# ss += 'integral = {}\n'.format(self.integral())
ss += "\nContainment info:\n"
# Print some example containment radii
fractions = [0.68, 0.95]
energies = Quantity([10, 100], "GeV")
for fraction in fractions:
rads = self.containment_radius(energies=energies, fraction=fraction)
for energy, rad in zip(energies, rads):
ss += " " + "{}% containment radius at {:3.0f}: {:.2f}\n".format(
100 * fraction, energy, rad
)
return ss
[docs] @classmethod
def from_fits(cls, hdu_list):
"""Create `EnergyDependentTablePSF` from ``gtpsf`` format HDU list.
Parameters
----------
hdu_list : `~astropy.io.fits.HDUList`
HDU list with ``THETA`` and ``PSF`` extensions.
"""
rad = Angle(hdu_list["THETA"].data["Theta"], "deg")
energy = Quantity(hdu_list["PSF"].data["Energy"], "MeV")
exposure = Quantity(hdu_list["PSF"].data["Exposure"], "cm^2 s")
psf_value = Quantity(hdu_list["PSF"].data["PSF"], "sr^-1")
return cls(energy, rad, exposure, psf_value)
[docs] def to_fits(self):
"""Convert to FITS HDU list format.
Returns
-------
hdu_list : `~astropy.io.fits.HDUList`
PSF in HDU list format.
"""
# TODO: write HEADER keywords as gtpsf
data = self.rad
theta_hdu = fits.BinTableHDU(data=data, name="Theta")
data = [self.energy, self.exposure, self.psf_value]
psf_hdu = fits.BinTableHDU(data=data, name="PSF")
hdu_list = fits.HDUList([theta_hdu, psf_hdu])
return hdu_list
[docs] @classmethod
def read(cls, filename):
"""Create `EnergyDependentTablePSF` from ``gtpsf``-format FITS file.
Parameters
----------
filename : str
File name
"""
filename = str(make_path(filename))
with fits.open(filename, memmap=False) as hdulist:
psf = cls.from_fits(hdulist)
return psf
[docs] def write(self, *args, **kwargs):
"""Write to FITS file.
Calls `~astropy.io.fits.HDUList.writeto`, forwarding all arguments.
"""
self.to_fits().writeto(*args, **kwargs)
[docs] def evaluate(self, energy=None, rad=None, interp_kwargs=None):
"""Evaluate the PSF at a given energy and offset
Parameters
----------
energy : `~astropy.units.Quantity`
energy value
rad : `~astropy.coordinates.Angle`
Offset wrt source position
interp_kwargs : dict
option for interpolation for `~scipy.interpolate.RegularGridInterpolator`
Returns
-------
values : `~astropy.units.Quantity`
Interpolated value
"""
if interp_kwargs is None:
interp_kwargs = dict(bounds_error=False, fill_value=None)
if energy is None:
energy = self.energy
if rad is None:
rad = self.rad
energy = Energy(energy).to("TeV")
rad = Angle(rad).to("deg")
energy_bin = self.energy.to("TeV")
rad_bin = self.rad.to("deg")
points = (energy_bin, rad_bin)
interpolator = RegularGridInterpolator(
points, self.psf_value.value, **interp_kwargs
)
energy_grid, rad_grid = np.meshgrid(energy.value, rad.value, indexing="ij")
shape = energy_grid.shape
pix_coords = np.column_stack([energy_grid.flat, rad_grid.flat])
data_interp = interpolator(pix_coords)
return Quantity(data_interp.reshape(shape), self.psf_value.unit)
[docs] def table_psf_at_energy(self, energy, interp_kwargs=None, **kwargs):
"""Create `~gammapy.irf.TablePSF` at one given energy.
Parameters
----------
energy : `~astropy.units.Quantity`
Energy
interp_kwargs : dict
Option for interpolation for `~scipy.interpolate.RegularGridInterpolator`
Returns
-------
psf : `~gammapy.irf.TablePSF`
Table PSF
"""
psf_value = self.evaluate(energy, None, interp_kwargs)[0, :]
return TablePSF(self.rad, psf_value, **kwargs)
[docs] def table_psf_in_energy_band(
self, energy_band, spectral_index=2, spectrum=None, **kwargs
):
"""Average PSF in a given energy band.
Expected counts in sub energy bands given the given exposure
and spectrum are used as weights.
Parameters
----------
energy_band : `~astropy.units.Quantity`
Energy band
spectral_index : float
Power law spectral index (used if spectrum=None).
spectrum : callable
Spectrum (callable with energy as parameter).
Returns
-------
psf : `TablePSF`
Table PSF
"""
if spectrum is None:
# This is a false positive error from pylint
# See https://github.com/PyCQA/pylint/issues/2410#issuecomment-415026690
def spectrum(energy): # pylint:disable=function-redefined
return (energy / energy_band[0]) ** (-spectral_index)
# TODO: warn if `energy_band` is outside available data.
energy_idx_min, energy_idx_max = self._energy_index(energy_band)
# TODO: improve this, probably by evaluating the PSF (i.e. interpolating in energy) onto a new energy grid
# This is a bit of a hack, but makes sure that a PSF is given, by forcing at least one slice:
if energy_idx_max - energy_idx_min < 2:
# log.warning('Dubious case of PSF energy binning')
# Note that below always range stop of `energy_idx_max - 1` is used!
# That's why we put +2 here to make sure we have at least one bin.
energy_idx_max = max(energy_idx_min + 2, energy_idx_max)
# Make sure we don't step out of the energy array (doesn't help much)
energy_idx_max = min(energy_idx_max, len(self.energy))
# TODO: extract this into a utility function `npred_weighted_mean()`
# Compute weights for energy bins
weights = np.zeros_like(self.energy.value, dtype=np.float64)
for idx in range(energy_idx_min, energy_idx_max - 1):
energy_min = self.energy[idx]
energy_max = self.energy[idx + 1]
exposure = self.exposure[idx]
flux = spectrum(energy_min)
weights[idx] = (exposure * flux * (energy_max - energy_min)).value
# Normalize weights to sum to 1
weights = weights / weights.sum()
# Compute weighted PSF value array
total_psf_value = np.zeros_like(self._get_1d_psf_values(0), dtype=np.float64)
for idx in range(energy_idx_min, energy_idx_max - 1):
psf_value = self._get_1d_psf_values(idx)
total_psf_value += weights[idx] * psf_value
# TODO: add version that returns `total_psf_value` without
# making a `TablePSF`.
return TablePSF(self.rad, total_psf_value, **kwargs)
[docs] def containment_radius(self, energies, fraction, interp_kwargs=None):
"""Containment radius.
Parameters
----------
energies : `~astropy.units.Quantity`
Energy
fraction : float
Containment fraction in %
Returns
-------
rad : `~astropy.units.Quantity`
Containment radius in deg
"""
# TODO: figure out if there's a more efficient implementation to support
# arrays of energy
energies = np.atleast_1d(energies)
psfs = [self.table_psf_at_energy(energy, interp_kwargs) for energy in energies]
rad = [psf.containment_radius(fraction) for psf in psfs]
return Quantity(rad)
[docs] def integral(self, energy, rad_min, rad_max):
"""Containment fraction.
Parameters
----------
energy : `~astropy.units.Quantity`
Energy
rad_min, rad_max : `~astropy.coordinates.Angle`
Offset
Returns
-------
fraction : array_like
Containment fraction (in range 0 .. 1)
"""
# TODO: useless at the moment ... support array inputs or remove!
psf = self.table_psf_at_energy(energy)
return psf.integral(rad_min, rad_max)
[docs] def info(self):
"""Print basic info"""
print(str(self))
[docs] def plot_psf_vs_rad(self, energies=[1e4, 1e5, 1e6]):
"""Plot PSF vs radius.
Parameters
----------
TODO
"""
import matplotlib.pyplot as plt
plt.figure(figsize=(6, 4))
for energy in energies:
energy_index = self._energy_index(energy)
psf = self.psf_value[energy_index, :]
label = "{} GeV".format(1e-3 * energy)
x = np.hstack([-self.rad[::-1], self.rad])
y = 1e-6 * np.hstack([psf[::-1], psf])
plt.plot(x, y, lw=2, label=label)
# plt.semilogy()
# plt.loglog()
plt.legend()
plt.xlim(-0.2, 0.5)
plt.xlabel("Offset (deg)")
plt.ylabel("PSF (1e-6 sr^-1)")
plt.tight_layout()
[docs] def plot_containment_vs_energy(
self, ax=None, fractions=[0.63, 0.8, 0.95], **kwargs
):
"""Plot containment versus energy."""
import matplotlib.pyplot as plt
ax = plt.gca() if ax is None else ax
energy = Energy.equal_log_spacing(self.energy.min(), self.energy.max(), 10)
for fraction in fractions:
rad = self.containment_radius(energy, fraction)
label = "{:.1f}% Containment".format(100 * fraction)
ax.plot(energy.value, rad.value, label=label, **kwargs)
ax.semilogx()
ax.legend(loc="best")
ax.set_xlabel("Energy (GeV)")
ax.set_ylabel("Containment radius (deg)")
[docs] def plot_exposure_vs_energy(self):
"""Plot exposure versus energy."""
import matplotlib.pyplot as plt
plt.figure(figsize=(4, 3))
plt.plot(self.energy, self.exposure, color="black", lw=3)
plt.semilogx()
plt.xlabel("Energy (MeV)")
plt.ylabel("Exposure (cm^2 s)")
plt.xlim(1e4 / 1.3, 1.3 * 1e6)
plt.ylim(0, 1.5e11)
plt.tight_layout()
def _energy_index(self, energy):
"""Find energy array index.
"""
# TODO: test with array input
return np.searchsorted(self.energy, energy)
def _get_1d_psf_values(self, energy_index):
"""Get 1-dim PSF value array.
Parameters
----------
energy_index : int
Energy index
Returns
-------
psf_values : `~astropy.units.Quantity`
PSF value array
"""
psf_values = self.psf_value[energy_index, :].flatten().copy()
# When the PSF Table is not filled (with nan), the psf estimation at a given energy crashes
psf_values[np.isnan(psf_values)] = 0
return psf_values
def _get_1d_table_psf(self, energy_index, **kwargs):
"""Get 1-dim TablePSF (cached).
Parameters
----------
energy_index : int
Energy index
Returns
-------
table_psf : `TablePSF`
Table PSF
"""
# TODO: support array_like `energy_index` here?
if self._table_psf_cache[energy_index] is None:
psf_value = self._get_1d_psf_values(energy_index)
table_psf = TablePSF(self.rad, psf_value, **kwargs)
self._table_psf_cache[energy_index] = table_psf
return self._table_psf_cache[energy_index]