# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
import numpy as np
from astropy import units as u
from astropy.convolution import Gaussian2DKernel
from astropy.coordinates import Angle
from astropy.io import fits
from astropy.stats import gaussian_fwhm_to_sigma
from astropy.table import Table
from gammapy.maps import MapAxes, MapAxis
from gammapy.utils.array import array_stats_str
from gammapy.utils.gauss import MultiGauss2D
from gammapy.utils.interpolation import ScaledRegularGridInterpolator
from gammapy.utils.scripts import make_path
from .psf_3d import PSF3D
from .psf_table import EnergyDependentTablePSF
__all__ = ["EnergyDependentMultiGaussPSF"]
log = logging.getLogger(__name__)
[docs]class EnergyDependentMultiGaussPSF:
"""Triple Gauss analytical PSF depending on energy and theta.
To evaluate the PSF call the ``to_energy_dependent_table_psf`` or ``psf_at_energy_and_theta`` methods.
Parameters
----------
energy_axis_true : `MapAxis`
True energy axis
offset_axis : `MapAxis`
Offset axis.
sigmas : list of 'numpy.ndarray'
Triple Gauss sigma parameters, where every entry is
a two dimensional 'numpy.ndarray' containing the sigma
value for every given energy and theta.
norms : list of 'numpy.ndarray'
Triple Gauss norm parameters, where every entry is
a two dimensional 'numpy.ndarray' containing the norm
value for every given energy and theta. Norm corresponds
to the value of the Gaussian at theta = 0.
energy_thresh_lo : `~astropy.units.u.Quantity`
Lower save energy threshold of the psf.
energy_thresh_hi : `~astropy.units.u.Quantity`
Upper save energy threshold of the psf.
Examples
--------
Plot R68 of the PSF vs. theta and energy:
.. plot::
:include-source:
import matplotlib.pyplot as plt
from gammapy.irf import EnergyDependentMultiGaussPSF
filename = '$GAMMAPY_DATA/cta-1dc/caldb/data/cta/1dc/bcf/South_z20_50h/irf_file.fits'
psf = EnergyDependentMultiGaussPSF.read(filename, hdu='POINT SPREAD FUNCTION')
psf.plot_containment(0.68)
plt.show()
"""
tag = "psf_3gauss"
def __init__(
self,
energy_axis_true,
offset_axis,
sigmas,
norms,
energy_thresh_lo="0.1 TeV",
energy_thresh_hi="100 TeV",
):
assert energy_axis_true.name == "energy_true"
assert offset_axis.name == "offset"
self._energy_axis_true = energy_axis_true
self._offset_axis = offset_axis
sigmas[0][sigmas[0] == 0] = 1
sigmas[1][sigmas[1] == 0] = 1
sigmas[2][sigmas[2] == 0] = 1
self.sigmas = sigmas
self.norms = norms
self.energy_thresh_lo = u.Quantity(energy_thresh_lo, "TeV")
self.energy_thresh_hi = u.Quantity(energy_thresh_hi, "TeV")
self._interp_norms = self._setup_interpolators(self.norms)
self._interp_sigmas = self._setup_interpolators(self.sigmas)
@property
def energy_axis_true(self):
return self._energy_axis_true
@property
def offset_axis(self):
return self._offset_axis
def _setup_interpolators(self, values_list):
interps = []
for values in values_list:
interp = ScaledRegularGridInterpolator(
points=(self.offset_axis.center, self.energy_axis_true.center),
values=values,
)
interps.append(interp)
return interps
[docs] @classmethod
def read(cls, filename, hdu="PSF_2D_GAUSS"):
"""Create `EnergyDependentMultiGaussPSF` from FITS file.
Parameters
----------
filename : str
File name
"""
with fits.open(str(make_path(filename)), memmap=False) as hdulist:
return cls.from_table_hdu(hdulist[hdu])
[docs] @classmethod
def from_table_hdu(cls, hdu):
"""Create `EnergyDependentMultiGaussPSF` from HDU list.
Parameters
----------
hdu : `~astropy.io.fits.BinTableHDU`
HDU
"""
table = Table.read(hdu)
energy_axis_true = MapAxis.from_table(
table, column_prefix="ENERG", format="gadf-dl3"
)
offset_axis = MapAxis.from_table(
table, column_prefix="THETA", format="gadf-dl3"
)
# Get sigmas
shape = (offset_axis.nbin, energy_axis_true.nbin)
sigmas = []
for key in ["SIGMA_1", "SIGMA_2", "SIGMA_3"]:
sigma = hdu.data[key].reshape(shape).copy()
sigmas.append(sigma)
# Get amplitudes
norms = []
for key in ["SCALE", "AMPL_2", "AMPL_3"]:
norm = hdu.data[key].reshape(shape).copy()
norms.append(norm)
opts = {}
try:
opts["energy_thresh_lo"] = u.Quantity(hdu.header["LO_THRES"], "TeV")
opts["energy_thresh_hi"] = u.Quantity(hdu.header["HI_THRES"], "TeV")
except KeyError:
pass
return cls(
energy_axis_true=energy_axis_true,
offset_axis=offset_axis,
sigmas=sigmas,
norms=norms,
**opts,
)
[docs] def to_hdulist(self):
"""
Convert psf table data to FITS hdu list.
Returns
-------
hdu_list : `~astropy.io.fits.HDUList`
PSF in HDU list format.
"""
# Set up data
names = [
"SCALE",
"SIGMA_1",
"AMPL_2",
"SIGMA_2",
"AMPL_3",
"SIGMA_3",
]
units = ["", "deg", "", "deg", "", "deg"]
data = [
self.norms[0],
self.sigmas[0],
self.norms[1],
self.sigmas[1],
self.norms[2],
self.sigmas[2],
]
axes = MapAxes([self.energy_axis_true, self.offset_axis])
table = axes.to_table(format="gadf-dl3")
for name_, data_, unit_ in zip(names, data, units):
table[name_] = [data_]
table[name_].unit = unit_
# Create hdu and hdu list
hdu = fits.BinTableHDU(table)
hdu.header["LO_THRES"] = self.energy_thresh_lo.value
hdu.header["HI_THRES"] = self.energy_thresh_hi.value
return fits.HDUList([fits.PrimaryHDU(), hdu])
[docs] def write(self, filename, *args, **kwargs):
"""Write PSF to FITS file.
Calls `~astropy.io.fits.HDUList.writeto`, forwarding all arguments.
"""
self.to_hdulist().writeto(str(make_path(filename)), *args, **kwargs)
[docs] def psf_at_energy_and_theta(self, energy, theta):
"""
Get `~gammapy.modeling.models.MultiGauss2D` model for given energy and theta.
No interpolation is used.
Parameters
----------
energy : `~astropy.units.u.Quantity`
Energy at which a PSF is requested.
theta : `~astropy.coordinates.Angle`
Offset angle at which a PSF is requested.
Returns
-------
psf : `~gammapy.morphology.MultiGauss2D`
Multigauss PSF object.
"""
energy = u.Quantity(energy)
theta = u.Quantity(theta)
pars = {}
for name, interp_norm in zip(["scale", "A_2", "A_3"], self._interp_norms):
pars[name] = interp_norm((theta, energy))
for idx, interp_sigma in enumerate(self._interp_sigmas):
pars[f"sigma_{idx + 1}"] = interp_sigma((theta, energy))
psf = HESSMultiGaussPSF(pars)
return psf.to_MultiGauss2D(normalize=True)
[docs] def containment_radius(self, energy, theta, fraction=0.68):
"""Compute containment for all energy and theta values"""
# This is a false positive from pylint
# See https://github.com/PyCQA/pylint/issues/2435
energies = u.Quantity(
energy
).flatten() # pylint:disable=assignment-from-no-return
thetas = Angle(theta).flatten()
radius = np.empty((theta.size, energy.size))
for idx, energy in enumerate(energies):
for jdx, theta in enumerate(thetas):
try:
psf = self.psf_at_energy_and_theta(energy, theta)
radius[jdx, idx] = psf.containment_radius(fraction)
except ValueError:
log.debug(
f"Computing containment failed for energy = {energy:.2f}"
f" and theta={theta:.2f}"
)
log.debug(f"Sigmas: {psf.sigmas} Norms: {psf.norms}")
radius[jdx, idx] = np.nan
return Angle(radius, "deg")
[docs] def plot_containment(self, fraction=0.68, ax=None, add_cbar=True, **kwargs):
"""
Plot containment image with energy and theta axes.
Parameters
----------
fraction : float
Containment fraction between 0 and 1.
add_cbar : bool
Add a colorbar
"""
import matplotlib.pyplot as plt
ax = plt.gca() if ax is None else ax
energy = self.energy_axis_true.center
offset = self.offset_axis.center
# Set up and compute data
containment = self.containment_radius(energy, offset, fraction)
# plotting defaults
kwargs.setdefault("cmap", "GnBu")
kwargs.setdefault("vmin", np.nanmin(containment.value))
kwargs.setdefault("vmax", np.nanmax(containment.value))
# Plotting
x = energy.value
y = offset.value
caxes = ax.pcolormesh(x, y, containment.value, **kwargs)
# Axes labels and ticks, colobar
ax.semilogx()
ax.set_ylabel(f"Offset ({offset.unit})")
ax.set_xlabel(f"Energy ({energy.unit})")
ax.set_xlim(x.min(), x.max())
ax.set_ylim(y.min(), y.max())
self._plot_safe_energy_range(ax)
if add_cbar:
label = f"Containment radius R{100 * fraction:.0f} ({containment.unit})"
ax.figure.colorbar(caxes, ax=ax, label=label)
return ax
def _plot_safe_energy_range(self, ax):
"""add safe energy range lines to the plot"""
esafe = self.energy_thresh_lo
omin = self.offset_axis.center.min()
omax = self.offset_axis.center.max()
ax.vlines(x=esafe.value, ymin=omin.value, ymax=omax.value)
label = f"Safe energy threshold: {esafe:3.2f}"
ax.text(x=1.1 * esafe.value, y=0.3, s=label, va="top")
[docs] def plot_containment_vs_energy(
self, fractions=[0.68, 0.95], thetas=Angle([0, 1], "deg"), ax=None, **kwargs
):
"""Plot containment fraction as a function of energy.
"""
import matplotlib.pyplot as plt
ax = plt.gca() if ax is None else ax
energy = self.energy_axis_true.center
for theta in thetas:
for fraction in fractions:
radius = self.containment_radius(energy, theta, fraction).squeeze()
kwargs.setdefault("label", f"{theta.deg} deg, {100 * fraction:.1f}%")
ax.plot(energy.value, radius.value, **kwargs)
ax.semilogx()
ax.legend(loc="best")
ax.set_xlabel("Energy (TeV)")
ax.set_ylabel("Containment radius (deg)")
[docs] def peek(self, figsize=(15, 5)):
"""Quick-look summary plots."""
import matplotlib.pyplot as plt
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=figsize)
self.plot_containment(fraction=0.68, ax=axes[0])
self.plot_containment(fraction=0.95, ax=axes[1])
self.plot_containment_vs_energy(ax=axes[2])
# TODO: implement this plot
# psf = self.psf_at_energy_and_theta(energy='1 TeV', theta='1 deg')
# psf.plot_components(ax=axes[2])
plt.tight_layout()
[docs] def info(
self,
fractions=[0.68, 0.95],
energies=u.Quantity([1.0, 10.0], "TeV"),
thetas=u.Quantity([0.0], "deg"),
):
"""
Print PSF summary info.
The containment radius for given fraction, energies and thetas is
computed and printed on the command line.
Parameters
----------
fractions : list
Containment fraction to compute containment radius for.
energies : `~astropy.units.u.Quantity`
Energies to compute containment radius for.
thetas : `~astropy.units.u.Quantity`
Thetas to compute containment radius for.
Returns
-------
ss : string
Formatted string containing the summary info.
"""
ss = "\nSummary PSF info\n"
ss += "----------------\n"
ss += array_stats_str(self.offset_axis.center.to("deg"), "Theta")
ss += array_stats_str(self.energy_axis_true.edges[1:], "Energy hi")
ss += array_stats_str(self.energy_axis_true.edges[:-1], "Energy lo")
ss += f"Safe energy threshold lo: {self.energy_thresh_lo:6.3f}\n"
ss += f"Safe energy threshold hi: {self.energy_thresh_hi:6.3f}\n"
for fraction in fractions:
containment = self.containment_radius(energies, thetas, fraction)
for i, energy in enumerate(energies):
for j, theta in enumerate(thetas):
radius = containment[j, i]
ss += (
"{:2.0f}% containment radius at theta = {} and "
"E = {:4.1f}: {:5.8f}\n"
"".format(100 * fraction, theta, energy, radius)
)
return ss
[docs] def to_energy_dependent_table_psf(self, theta=None, rad=None, exposure=None):
"""Convert triple Gaussian PSF ot table PSF.
Parameters
----------
theta : `~astropy.coordinates.Angle`
Offset in the field of view. Default theta = 0 deg
rad : `~astropy.coordinates.Angle`
Offset from PSF center used for evaluating the PSF on a grid.
Default offset = [0, 0.005, ..., 1.495, 1.5] deg.
exposure : `~astropy.units.u.Quantity`
Energy dependent exposure. Should be in units equivalent to 'cm^2 s'.
Default exposure = 1.
Returns
-------
tabe_psf : `~gammapy.irf.EnergyDependentTablePSF`
Instance of `EnergyDependentTablePSF`.
"""
# Convert energies to log center
energies = self.energy_axis_true.center
# Defaults and input handling
if theta is None:
theta = Angle(0, "deg")
else:
theta = Angle(theta)
if rad is None:
rad = Angle(np.arange(0, 1.5, 0.005), "deg")
rad_axis = MapAxis.from_nodes(rad, name="rad")
psf_value = u.Quantity(np.zeros((energies.size, rad.size)), "deg^-2")
for idx, energy in enumerate(energies):
psf_gauss = self.psf_at_energy_and_theta(energy, theta)
psf_value[idx] = u.Quantity(psf_gauss(rad), "deg^-2")
return EnergyDependentTablePSF(
energy_axis_true=self.energy_axis_true,
rad_axis=rad_axis,
exposure=exposure,
psf_value=psf_value,
)
[docs] def to_psf3d(self, rad=None):
"""Create a PSF3D from an analytical PSF.
Parameters
----------
rad : `~astropy.units.u.Quantity` or `~astropy.coordinates.Angle`
the array of position errors (rad) on which the PSF3D will be defined
Returns
-------
psf3d : `~gammapy.irf.PSF3D`
the PSF3D. It will be defined on the same energy and offset values than the input psf.
"""
offsets = self.offset_axis.center
energy = self.energy_axis_true.center
if rad is None:
rad = np.linspace(0, 0.66, 67) * u.deg
rad_axis = MapAxis.from_edges(rad, name="rad")
shape = (self.energy_axis_true.nbin, self.offset_axis.nbin, rad_axis.nbin)
psf_value = np.zeros(shape) * u.Unit("sr-1")
for idx, offset in enumerate(offsets):
table_psf = self.to_energy_dependent_table_psf(offset)
psf_value[:, idx, :] = table_psf.evaluate(energy, rad_axis.center)
return PSF3D(
energy_axis_true=self.energy_axis_true,
rad_axis=rad_axis,
offset_axis=self.offset_axis,
psf_value=psf_value,
energy_thresh_lo=self.energy_thresh_lo,
energy_thresh_hi=self.energy_thresh_hi,
)
class HESSMultiGaussPSF:
"""Multi-Gauss PSF as represented in the HESS software.
The 2D Gaussian is represented as a 1D exponential
probability density function per offset angle squared:
dp / dtheta**2 = [0]*(exp(-x/(2*[1]*[1]))+[2]*exp(-x/(2*[3]*[3]))
@param source: either a dict of a filename
The following two parameters control numerical
precision / speed. Usually the defaults are fine.
@param theta_max: Maximum offset in numerical computations
@param npoints: Number of points in numerical computations
@param eps: Allowed tolerance on normalization of total P to 1
"""
def __init__(self, source):
if isinstance(source, dict):
# Assume source is a dict with correct format
self.pars = source
else:
# Assume source is a filename with correct format
self.pars = self._read_ascii(source)
# Scale will be computed from normalization anyways,
# so any default is fine here
self.pars["scale"] = self.pars.get("scale", 1)
# This avoids handling the first PSF as a special case
self.pars["A_1"] = self.pars.get("A_1", 1)
def _read_ascii(self, filename):
"""Parse file with parameters."""
fh = open(filename) # .readlines()
pars = {}
for line in fh:
try:
key, value = line.strip().split()[:2]
if key.startswith("#"):
continue
else:
pars[key] = float(value)
except ValueError:
pass
fh.close()
return pars
def n_gauss(self):
"""Count number of Gaussians."""
return len([_ for _ in self.pars.keys() if "sigma" in _])
def dpdtheta2(self, theta2):
"""dp / dtheta2 at position theta2 = theta ^ 2."""
theta2 = np.asarray(theta2, "f")
total = np.zeros_like(theta2)
for ii in range(1, self.n_gauss() + 1):
A = self.pars[f"A_{ii}"]
sigma = self.pars[f"sigma_{ii}"]
total += A * np.exp(-theta2 / (2 * sigma ** 2))
return self.pars["scale"] * total
def to_MultiGauss2D(self, normalize=True):
"""Use this to compute containment angles and fractions.
Note: We have to set norm = 2 * A * sigma ^ 2, because in
MultiGauss2D norm represents the integral, and in HESS A
represents the amplitude at 0.
"""
sigmas, norms = [], []
for ii in range(1, self.n_gauss() + 1):
A = self.pars[f"A_{ii}"]
sigma = self.pars[f"sigma_{ii}"]
norm = self.pars["scale"] * 2 * A * sigma ** 2
sigmas.append(sigma)
norms.append(norm)
m = MultiGauss2D(sigmas, norms)
if normalize:
m.normalize()
return m
def multi_gauss_psf_kernel(psf_parameters, BINSZ=0.02, NEW_BINSZ=0.02, **kwargs):
"""Create multi-Gauss PSF kernel.
The Gaussian PSF components are specified via the
amplitude at the center and the FWHM.
See the example for the exact format.
Parameters
----------
psf_parameters : dict
PSF parameters
BINSZ : float (0.02)
Pixel size used for the given parameters in deg.
NEW_BINSZ : float (0.02)
New pixel size in deg. USed to change the resolution of the PSF.
Returns
-------
psf_kernel : `astropy.convolution.Kernel2D`
PSF kernel
Examples
--------
>>> psf_pars = dict()
>>> psf_pars['psf1'] = dict(ampl=1, fwhm=2.5)
>>> psf_pars['psf2'] = dict(ampl=0.06, fwhm=11.14)
>>> psf_pars['psf3'] = dict(ampl=0.47, fwhm=5.16)
>>> psf_kernel = multi_gauss_psf_kernel(psf_pars, x_size=51)
"""
psf = None
for ii in range(1, 4):
# Convert sigma and amplitude
pars = psf_parameters[f"psf{ii}"]
sigma = gaussian_fwhm_to_sigma * pars["fwhm"] * BINSZ / NEW_BINSZ
ampl = 2 * np.pi * sigma ** 2 * pars["ampl"]
if psf is None:
psf = float(ampl) * Gaussian2DKernel(sigma, **kwargs)
else:
psf += float(ampl) * Gaussian2DKernel(sigma, **kwargs)
psf.normalize()
return psf