# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
import logging
import numpy as np
from scipy.interpolate import UnivariateSpline
from astropy.io import fits
from astropy.units import Quantity
from astropy.coordinates import Angle, SkyCoord
from ..utils.interpolation import ScaledRegularGridInterpolator
from ..utils.gauss import Gauss2DPDF
from ..utils.scripts import make_path
from ..utils.array import array_stats_str
from ..utils.energy import Energy
__all__ = ["TablePSF", "EnergyDependentTablePSF"]
log = logging.getLogger(__name__)
# Default PSF spline keyword arguments
# TODO: test and document
DEFAULT_PSF_SPLINE_KWARGS = dict(k=1, s=0)
[docs]class TablePSF(object):
r"""Radially-symmetric table PSF.
This PSF represents a :math:`PSF(r)=dP / d\Omega(r)`
spline interpolation curve for a given set of offset :math:`r`
and :math:`PSF` points.
Uses `scipy.interpolate.UnivariateSpline`.
Parameters
----------
rad : `~astropy.units.Quantity` with angle units
Offset wrt source position
dp_domega : `~astropy.units.Quantity` with sr^-1 units
PSF value array
spline_kwargs : dict
Keyword arguments passed to `~scipy.interpolate.UnivariateSpline`
Notes
-----
* This PSF class works well for model PSFs of arbitrary shape (represented by a table),
but might give unstable results if the PSF has noise.
E.g. if ``dp_domega`` was estimated from histograms of real or simulated event data
with finite statistics, it will have noise and it is your responsibility
to check that the interpolating spline is reasonable.
* To customize the spline, pass keyword arguments to `~scipy.interpolate.UnivariateSpline`
in ``spline_kwargs``. E.g. passing ``dict(k=1)`` changes from the default cubic to
linear interpolation.
* TODO: evaluate spline for ``(log(rad), log(PSF))`` for numerical stability?
* TODO: merge morphology.theta class functionality with this class.
* TODO: add FITS I/O methods
* TODO: add ``normalize`` argument to ``__init__`` with default ``True``?
* TODO: ``__call__`` doesn't show up in the html API docs, but it should:
https://github.com/astropy/astropy/pull/2135
"""
def __init__(self, rad, dp_domega, spline_kwargs=DEFAULT_PSF_SPLINE_KWARGS):
self._rad = Angle(rad).to("radian")
self._dp_domega = Quantity(dp_domega).to("sr^-1")
assert self._rad.ndim == self._dp_domega.ndim == 1
assert self._rad.shape == self._dp_domega.shape
# Store input arrays as quantities in default internal units
self._dp_dr = (2 * np.pi * self._rad * self._dp_domega).to("radian^-1")
self._spline_kwargs = spline_kwargs
self._compute_splines(spline_kwargs)
[docs] @classmethod
def from_shape(cls, shape, width, rad):
"""Make TablePSF objects with commonly used shapes.
This function is mostly useful for examples and testing.
Parameters
----------
shape : {'disk', 'gauss'}
PSF shape.
width : `~astropy.units.Quantity` with angle units
PSF width angle (radius for disk, sigma for Gauss).
rad : `~astropy.units.Quantity` with angle units
Offset angle
Returns
-------
psf : `TablePSF`
Table PSF
Examples
--------
>>> import numpy as np
>>> from astropy.coordinates import Angle
>>> from gammapy.irf import TablePSF
>>> TablePSF.from_shape(shape='gauss', width='0.2 deg',
... rad=Angle(np.linspace(0, 0.7, 100), 'deg'))
"""
width = Angle(width)
rad = Angle(rad)
if shape == "disk":
amplitude = 1 / (np.pi * width.radian ** 2)
psf_value = np.where(rad < width, amplitude, 0)
elif shape == "gauss":
gauss2d_pdf = Gauss2DPDF(sigma=width.radian)
psf_value = gauss2d_pdf(rad.radian)
else:
raise ValueError("Invalid shape: {}".format(shape))
psf_value = Quantity(psf_value, "sr^-1")
return cls(rad, psf_value)
[docs] def info(self):
"""Print basic info."""
ss = array_stats_str(self._rad.degree, "offset")
ss += "integral = {}\n".format(self.integral())
for containment in [50, 68, 80, 95]:
radius = self.containment_radius(0.01 * containment)
ss += "containment radius {} deg for {}%\n".format(
radius.degree, containment
)
return ss
# TODO: remove because it's not flexible enough?
[docs] def __call__(self, lon, lat):
"""Evaluate PSF at a 2D position.
The PSF is centered on ``(0, 0)``.
Parameters
----------
lon, lat : `~astropy.coordinates.Angle`
Longitude / latitude position
Returns
-------
psf_value : `~astropy.units.Quantity`
PSF value
"""
center = SkyCoord(0, 0, unit="radian")
point = SkyCoord(lon, lat)
rad = center.separation(point)
return self.evaluate(rad)
[docs] def evaluate(self, rad, quantity="dp_domega"):
r"""Evaluate PSF.
The following PSF quantities are available:
* 'dp_domega': PDF per 2-dim solid angle :math:`\Omega` in sr^-1
.. math:: \frac{dP}{d\Omega}
* 'dp_dr': PDF per 1-dim offset :math:`r` in radian^-1
.. math:: \frac{dP}{dr} = 2 \pi r \frac{dP}{d\Omega}
Parameters
----------
rad : `~astropy.coordinates.Angle`
Offset wrt source position
quantity : {'dp_domega', 'dp_dr'}
Which PSF quantity?
Returns
-------
psf_value : `~astropy.units.Quantity`
PSF value
"""
rad = Angle(rad)
shape = rad.shape
x = np.array(rad.radian).flat
if quantity == "dp_domega":
y = self._dp_domega_spline(x)
unit = "sr^-1"
elif quantity == "dp_dr":
y = self._dp_dr_spline(x)
unit = "radian^-1"
else:
ss = "Invalid quantity: {}\n".format(quantity)
ss += "Choose one of: 'dp_domega', 'dp_dr'"
raise ValueError(ss)
y = np.clip(a=y, a_min=0, a_max=None)
return Quantity(y, unit).reshape(shape)
[docs] def integral(self, rad_min=None, rad_max=None):
"""Compute PSF integral, aka containment fraction.
Parameters
----------
rad_min, rad_max : `~astropy.units.Quantity` with angle units
Offset angle range
Returns
-------
integral : float
PSF integral
"""
if rad_min is None:
rad_min = self._rad[0]
else:
rad_min = Angle(rad_min)
if rad_max is None:
rad_max = self._rad[-1]
else:
rad_max = Angle(rad_max)
rad_min = self._rad_clip(rad_min)
rad_max = self._rad_clip(rad_max)
cdf_min = self._cdf_spline(rad_min)
cdf_max = self._cdf_spline(rad_max)
return cdf_max - cdf_min
[docs] def containment_radius(self, fraction):
"""Containment radius.
Parameters
----------
fraction : array_like
Containment fraction (range 0 .. 1)
Returns
-------
rad : `~astropy.coordinates.Angle`
Containment radius angle
"""
rad = self._ppf_spline(fraction)
return Angle(rad, "radian").to("deg")
[docs] def normalize(self):
"""Normalize PSF to unit integral.
Computes the total PSF integral via the :math:`dP / dr` spline
and then divides the :math:`dP / dr` array.
"""
integral = self.integral()
self._dp_dr /= integral
# Clip to small positive number to avoid divide by 0
rad = np.clip(self._rad.radian, 1e-6, None)
rad = Quantity(rad, "radian")
self._dp_domega = self._dp_dr / (2 * np.pi * rad)
self._compute_splines(self._spline_kwargs)
[docs] def broaden(self, factor, normalize=True):
r"""Broaden PSF by scaling the offset array.
For a broadening factor :math:`f` and the offset
array :math:`r`, the offset array scaled
in the following way:
.. math::
r_{new} = f \times r_{old}
\frac{dP}{dr}(r_{new}) = \frac{dP}{dr}(r_{old})
Parameters
----------
factor : float
Broadening factor
normalize : bool
Normalize PSF after broadening
"""
self._rad *= factor
# We define broadening such that self._dp_domega remains the same
# so we only have to re-compute self._dp_dr and the slines here.
self._dp_dr = (2 * np.pi * self._rad * self._dp_domega).to("radian^-1")
self._compute_splines(self._spline_kwargs)
if normalize:
self.normalize()
[docs] def plot_psf_vs_rad(self, ax=None, quantity="dp_domega", **kwargs):
"""Plot PSF vs radius.
TODO: describe PSF ``quantity`` argument in a central place and link to it from here.
"""
import matplotlib.pyplot as plt
ax = plt.gca() if ax is None else ax
x = self._rad.to("deg")
y = self.evaluate(self._rad, quantity)
ax.plot(x.value, y.value, **kwargs)
ax.loglog()
ax.set_xlabel("Radius ({})".format(x.unit))
ax.set_ylabel("PSF ({})".format(y.unit))
def _compute_splines(self, spline_kwargs=DEFAULT_PSF_SPLINE_KWARGS):
"""Compute two splines representing the PSF.
* `_dp_domega_spline` is used to evaluate the 2D PSF.
* `_dp_dr_spline` is not really needed for most applications,
but is available via `eval`.
* `_cdf_spline` is used to compute integral and for normalisation.
* `_ppf_spline` is used to compute containment radii.
"""
# Compute spline and normalize.
x, y = self._rad.value, self._dp_domega.value
self._dp_domega_spline = UnivariateSpline(x, y, **spline_kwargs)
x, y = self._rad.value, self._dp_dr.value
self._dp_dr_spline = UnivariateSpline(x, y, **spline_kwargs)
# We use the terminology for scipy.stats distributions
# http://docs.scipy.org/doc/scipy/reference/tutorial/stats.html#common-methods
# cdf = "cumulative distribution function"
self._cdf_spline = self._dp_dr_spline.antiderivative()
# ppf = "percent point function" (inverse of cdf)
# Here's a discussion on methods to compute the ppf
# http://mail.scipy.org/pipermail/scipy-user/2010-May/025237.html
y = self._rad.value
x = self.integral(Angle(0, "rad"), self._rad)
# Since scipy 1.0 the UnivariateSpline requires that x is strictly increasing
# So only keep nodes where this is the case (and always keep the first one):
x, idx = np.unique(x, return_index=True)
y = y[idx]
# Dummy values, for cases where one really doesn't have a valid PSF.
if len(x) < 4:
x = [0, 1, 2, 3]
y = [0, 0, 0, 0]
self._ppf_spline = UnivariateSpline(x, y, **spline_kwargs)
def _rad_clip(self, rad):
"""Clip to radius support range, because spline extrapolation is unstable."""
rad = Angle(rad, "radian").radian
rad = np.clip(rad, 0, self._rad[-1].radian)
return rad
[docs]class EnergyDependentTablePSF(object):
"""Energy-dependent radially-symmetric table PSF (``gtpsf`` format).
TODO: add references and explanations.
Parameters
----------
energy : `~astropy.units.Quantity`
Energy (1-dim)
rad : `~astropy.units.Quantity` with angle units
Offset angle wrt source position (1-dim)
exposure : `~astropy.units.Quantity`
Exposure (1-dim)
psf_value : `~astropy.units.Quantity`
PSF (2-dim with axes: psf[energy_index, offset_index]
interp_kwargs : dict
Interpolation keyword arguments pass to `SCaledRegularGridInterpolator`.
"""
def __init__(self, energy, rad, exposure=None, psf_value=None, interp_kwargs=None):
self.energy = Quantity(energy).to("GeV")
self.rad = Quantity(rad).to("radian")
if exposure is None:
self.exposure = Quantity(np.ones(len(energy)), "cm^2 s")
else:
self.exposure = Quantity(exposure).to("cm^2 s")
if psf_value is None:
self.psf_value = Quantity(np.zeros(len(energy), len(rad)), "sr^-1")
else:
self.psf_value = Quantity(psf_value).to("sr^-1")
# Cache for TablePSF at each energy ... only computed when needed
self._table_psf_cache = [None] * len(self.energy)
interp_kwargs = interp_kwargs or {}
points = (self.energy.value, self.rad.value)
self._interpolate = ScaledRegularGridInterpolator(
points=points, values=self.psf_value, **interp_kwargs
)
def __str__(self):
ss = "EnergyDependentTablePSF\n"
ss += "-----------------------\n"
ss += "\nAxis info:\n"
ss += " " + array_stats_str(self.rad.to("deg"), "rad")
ss += " " + array_stats_str(self.energy, "energy")
# ss += ' ' + array_stats_str(self.exposure, 'exposure')
# ss += 'integral = {}\n'.format(self.integral())
ss += "\nContainment info:\n"
# Print some example containment radii
fractions = [0.68, 0.95]
energies = Quantity([10, 100], "GeV")
for fraction in fractions:
rads = self.containment_radius(energies=energies, fraction=fraction)
for energy, rad in zip(energies, rads):
ss += " " + "{}% containment radius at {:3.0f}: {:.2f}\n".format(
100 * fraction, energy, rad
)
return ss
[docs] @classmethod
def from_fits(cls, hdu_list):
"""Create `EnergyDependentTablePSF` from ``gtpsf`` format HDU list.
Parameters
----------
hdu_list : `~astropy.io.fits.HDUList`
HDU list with ``THETA`` and ``PSF`` extensions.
"""
rad = Angle(hdu_list["THETA"].data["Theta"], "deg")
energy = Quantity(hdu_list["PSF"].data["Energy"], "MeV")
exposure = Quantity(hdu_list["PSF"].data["Exposure"], "cm^2 s")
psf_value = Quantity(hdu_list["PSF"].data["PSF"], "sr^-1")
return cls(energy, rad, exposure, psf_value)
[docs] def to_fits(self):
"""Convert to FITS HDU list format.
Returns
-------
hdu_list : `~astropy.io.fits.HDUList`
PSF in HDU list format.
"""
# TODO: write HEADER keywords as gtpsf
data = self.rad
theta_hdu = fits.BinTableHDU(data=data, name="Theta")
data = [self.energy, self.exposure, self.psf_value]
psf_hdu = fits.BinTableHDU(data=data, name="PSF")
hdu_list = fits.HDUList([theta_hdu, psf_hdu])
return hdu_list
[docs] @classmethod
def read(cls, filename):
"""Create `EnergyDependentTablePSF` from ``gtpsf``-format FITS file.
Parameters
----------
filename : str
File name
"""
filename = str(make_path(filename))
with fits.open(filename, memmap=False) as hdulist:
psf = cls.from_fits(hdulist)
return psf
[docs] def write(self, *args, **kwargs):
"""Write to FITS file.
Calls `~astropy.io.fits.HDUList.writeto`, forwarding all arguments.
"""
self.to_fits().writeto(*args, **kwargs)
[docs] def evaluate(self, energy=None, rad=None, method="linear"):
"""Evaluate the PSF at a given energy and offset
Parameters
----------
energy : `~astropy.units.Quantity`
Energy value
rad : `~astropy.coordinates.Angle`
Offset wrt source position
method : {"linear", "nearest"}
Linear or nearest neighbour interpolation.
Returns
-------
values : `~astropy.units.Quantity`
Interpolated value
"""
if energy is None:
energy = self.energy
if rad is None:
rad = self.rad
energy = np.atleast_1d(Quantity(energy, "GeV").value)[:, np.newaxis]
rad = np.atleast_1d(Quantity(rad, "rad").value)
return self._interpolate((energy, rad), clip=True, method=method)
[docs] def table_psf_at_energy(self, energy, method="linear", **kwargs):
"""Create `~gammapy.irf.TablePSF` at one given energy.
Parameters
----------
energy : `~astropy.units.Quantity`
Energy
method : {"linear", "nearest"}
Linear or nearest neighbour interpolation.
Returns
-------
psf : `~gammapy.irf.TablePSF`
Table PSF
"""
psf_value = self.evaluate(energy=energy, method=method)[0, :]
return TablePSF(self.rad, psf_value, **kwargs)
[docs] def table_psf_in_energy_band(self, energy_band, spectrum=None, n_bins=11, **kwargs):
"""Average PSF in a given energy band.
Expected counts in sub energy bands given the given exposure
and spectrum are used as weights.
Parameters
----------
energy_band : `~astropy.units.Quantity`
Energy band
spectrum : `SpectralModel`
Spectral model used for weighting the PSF. Default is a power law
with index=2.
n_bins : int
Number of energy points in the energy band, used to compute the
weigthed PSF.
Returns
-------
psf : `TablePSF`
Table PSF
"""
from ..spectrum.models import PowerLaw, TableModel
if spectrum is None:
spectrum = PowerLaw()
exposure = TableModel(self.energy, self.exposure)
e_min, e_max = energy_band
energy = Energy.equal_log_spacing(emin=e_min, emax=e_max, nbins=n_bins)
weights = (spectrum * exposure)(energy)
weights /= weights.sum()
psf_value = self.evaluate(energy=energy)
psf_value_weighted = weights[:, np.newaxis] * psf_value
return TablePSF(self.rad, psf_value_weighted.sum(axis=0), **kwargs)
[docs] def containment_radius(self, energies, fraction, interp_kwargs=None):
"""Containment radius.
Parameters
----------
energies : `~astropy.units.Quantity`
Energy
fraction : float
Containment fraction in %
Returns
-------
rad : `~astropy.units.Quantity`
Containment radius in deg
"""
# TODO: figure out if there's a more efficient implementation to support
# arrays of energy
energies = np.atleast_1d(energies)
psfs = [self.table_psf_at_energy(energy, interp_kwargs) for energy in energies]
rad = [psf.containment_radius(fraction) for psf in psfs]
return Quantity(rad)
[docs] def integral(self, energy, rad_min, rad_max):
"""Containment fraction.
Parameters
----------
energy : `~astropy.units.Quantity`
Energy
rad_min, rad_max : `~astropy.coordinates.Angle`
Offset
Returns
-------
fraction : array_like
Containment fraction (in range 0 .. 1)
"""
# TODO: useless at the moment ... support array inputs or remove!
psf = self.table_psf_at_energy(energy)
return psf.integral(rad_min, rad_max)
[docs] def info(self):
"""Print basic info"""
print(str(self))
[docs] def plot_psf_vs_rad(self, energies=[1e4, 1e5, 1e6]):
"""Plot PSF vs radius.
Parameters
----------
TODO
"""
import matplotlib.pyplot as plt
plt.figure(figsize=(6, 4))
for energy in energies:
energy_index = self._energy_index(energy)
psf = self.psf_value[energy_index, :]
label = "{} GeV".format(1e-3 * energy)
x = np.hstack([-self.rad[::-1], self.rad])
y = 1e-6 * np.hstack([psf[::-1], psf])
plt.plot(x, y, lw=2, label=label)
# plt.semilogy()
# plt.loglog()
plt.legend()
plt.xlim(-0.2, 0.5)
plt.xlabel("Offset (deg)")
plt.ylabel("PSF (1e-6 sr^-1)")
plt.tight_layout()
[docs] def plot_containment_vs_energy(
self, ax=None, fractions=[0.63, 0.8, 0.95], **kwargs
):
"""Plot containment versus energy."""
import matplotlib.pyplot as plt
ax = plt.gca() if ax is None else ax
energy = Energy.equal_log_spacing(self.energy.min(), self.energy.max(), 10)
for fraction in fractions:
rad = self.containment_radius(energy, fraction)
label = "{:.1f}% Containment".format(100 * fraction)
ax.plot(energy.value, rad.value, label=label, **kwargs)
ax.semilogx()
ax.legend(loc="best")
ax.set_xlabel("Energy (GeV)")
ax.set_ylabel("Containment radius (deg)")
[docs] def plot_exposure_vs_energy(self):
"""Plot exposure versus energy."""
import matplotlib.pyplot as plt
plt.figure(figsize=(4, 3))
plt.plot(self.energy, self.exposure, color="black", lw=3)
plt.semilogx()
plt.xlabel("Energy (MeV)")
plt.ylabel("Exposure (cm^2 s)")
plt.xlim(1e4 / 1.3, 1.3 * 1e6)
plt.ylim(0, 1.5e11)
plt.tight_layout()
def _energy_index(self, energy):
"""Find energy array index.
"""
# TODO: test with array input
return np.searchsorted(self.energy, energy)
def _get_1d_psf_values(self, energy_index):
"""Get 1-dim PSF value array.
Parameters
----------
energy_index : int
Energy index
Returns
-------
psf_values : `~astropy.units.Quantity`
PSF value array
"""
psf_values = self.psf_value[energy_index, :].flatten().copy()
# When the PSF Table is not filled (with nan), the psf estimation at a given energy crashes
psf_values[np.isnan(psf_values)] = 0
return psf_values
def _get_1d_table_psf(self, energy_index, **kwargs):
"""Get 1-dim TablePSF (cached).
Parameters
----------
energy_index : int
Energy index
Returns
-------
table_psf : `TablePSF`
Table PSF
"""
# TODO: support array_like `energy_index` here?
if self._table_psf_cache[energy_index] is None:
psf_value = self._get_1d_psf_values(energy_index)
table_psf = TablePSF(self.rad, psf_value, **kwargs)
self._table_psf_cache[energy_index] = table_psf
return self._table_psf_cache[energy_index]