# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
import scipy.special
from scipy.interpolate import interp1d
from astropy.coordinates import Angle
from astropy.io import fits
from astropy.table import Table
from astropy.units import Quantity
from gammapy.maps import MapAxis
from gammapy.maps.utils import edges_from_lo_hi
from gammapy.utils.nddata import NDDataArray
from gammapy.utils.scripts import make_path
from .edisp_kernel import EDispKernel
__all__ = ["EnergyDispersion2D"]
[docs]class EnergyDispersion2D:
"""Offset-dependent energy dispersion matrix.
Data format specification: :ref:`gadf:edisp_2d`
Parameters
----------
e_true_lo, e_true_hi : `~astropy.units.Quantity`
True energy axis binning
migra_lo, migra_hi : `~numpy.ndarray`
Energy migration axis binning
offset_lo, offset_hi : `~astropy.coordinates.Angle`
Field of view offset axis binning
data : `~numpy.ndarray`
Energy dispersion probability density
Examples
--------
Read energy dispersion IRF from disk:
>>> from gammapy.maps import MapAxis
>>> from gammapy.irf import EnergyDispersion2D
>>> filename = '$GAMMAPY_DATA/hess-dl3-dr1/data/hess_dl3_dr1_obs_id_020136.fits.gz'
>>> edisp2d = EnergyDispersion2D.read(filename, hdu="EDISP")
Create energy dispersion matrix (`~gammapy.irf.EnergyDispersion`)
for a given field of view offset and energy binning:
>>> energy = MapAxis.from_bounds(0.1, 20, nbin=60, unit="TeV", interp="log").edges
>>> edisp = edisp2d.to_energy_dispersion(offset='1.2 deg', e_reco=energy, e_true=energy)
See Also
--------
EnergyDispersion
"""
default_interp_kwargs = dict(bounds_error=False, fill_value=None)
"""Default Interpolation kwargs for `~gammapy.utils.nddata.NDDataArray`. Extrapolate."""
def __init__(
self,
e_true_lo,
e_true_hi,
migra_lo,
migra_hi,
offset_lo,
offset_hi,
data,
interp_kwargs=None,
meta=None,
):
if interp_kwargs is None:
interp_kwargs = self.default_interp_kwargs
e_true_edges = edges_from_lo_hi(e_true_lo, e_true_hi)
e_true_axis = MapAxis.from_edges(e_true_edges, interp="log", name="energy_true")
migra_edges = edges_from_lo_hi(migra_lo, migra_hi)
migra_axis = MapAxis.from_edges(
migra_edges, interp="lin", name="migra", unit=""
)
# TODO: for some reason the H.E.S.S. DL3 files contain the same values for offset_hi and offset_lo
if np.allclose(offset_lo.to_value("deg"), offset_hi.to_value("deg")):
offset_axis = MapAxis.from_nodes(offset_lo, interp="lin", name="offset")
else:
offset_edges = edges_from_lo_hi(offset_lo, offset_hi)
offset_axis = MapAxis.from_edges(offset_edges, interp="lin", name="offset")
axes = [e_true_axis, migra_axis, offset_axis]
self.data = NDDataArray(axes=axes, data=data, interp_kwargs=interp_kwargs)
self.meta = meta or {}
def __str__(self):
ss = self.__class__.__name__
ss += f"\n{self.data}"
return ss
[docs] @classmethod
def from_gauss(cls, e_true, migra, bias, sigma, offset, pdf_threshold=1e-6):
"""Create Gaussian energy dispersion matrix (`EnergyDispersion2D`).
The output matrix will be Gaussian in (e_true / e_reco).
The ``bias`` and ``sigma`` should be either floats or arrays of same dimension than
``e_true``. ``bias`` refers to the mean value of the ``migra``
distribution minus one, i.e. ``bias=0`` means no bias.
Note that, the output matrix is flat in offset.
Parameters
----------
e_true : `~astropy.units.Quantity`
Bin edges of true energy axis
migra : `~astropy.units.Quantity`
Bin edges of migra axis
bias : float or `~numpy.ndarray`
Center of Gaussian energy dispersion, bias
sigma : float or `~numpy.ndarray`
RMS width of Gaussian energy dispersion, resolution
offset : `~astropy.units.Quantity`
Bin edges of offset
pdf_threshold : float, optional
Zero suppression threshold
"""
e_true = Quantity(e_true)
# erf does not work with Quantities
true = MapAxis.from_edges(e_true, interp="log").center.to_value("TeV")
true2d, migra2d = np.meshgrid(true, migra)
migra2d_lo = migra2d[:-1, :]
migra2d_hi = migra2d[1:, :]
# Analytical formula for integral of Gaussian
s = np.sqrt(2) * sigma
t1 = (migra2d_hi - 1 - bias) / s
t2 = (migra2d_lo - 1 - bias) / s
pdf = (scipy.special.erf(t1) - scipy.special.erf(t2)) / 2
pdf_array = pdf.T[:, :, np.newaxis] * np.ones(len(offset) - 1)
pdf_array[pdf_array < pdf_threshold] = 0
return cls(
e_true[:-1],
e_true[1:],
migra[:-1],
migra[1:],
offset[:-1],
offset[1:],
pdf_array,
)
[docs] @classmethod
def from_table(cls, table):
"""Create from `~astropy.table.Table`."""
if "ENERG_LO" in table.colnames:
e_lo = table["ENERG_LO"].quantity[0]
e_hi = table["ENERG_HI"].quantity[0]
elif "ETRUE_LO" in table.colnames:
e_lo = table["ETRUE_LO"].quantity[0]
e_hi = table["ETRUE_HI"].quantity[0]
else:
raise ValueError(
'Invalid column names. Need "ENERG_LO/ENERG_HI" or "ETRUE_LO/ETRUE_HI"'
)
o_lo = table["THETA_LO"].quantity[0]
o_hi = table["THETA_HI"].quantity[0]
m_lo = table["MIGRA_LO"].quantity[0]
m_hi = table["MIGRA_HI"].quantity[0]
# TODO Why does this need to be transposed?
matrix = table["MATRIX"].quantity[0].transpose()
return cls(
e_true_lo=e_lo,
e_true_hi=e_hi,
offset_lo=o_lo,
offset_hi=o_hi,
migra_lo=m_lo,
migra_hi=m_hi,
data=matrix,
)
[docs] @classmethod
def from_hdulist(cls, hdulist, hdu="edisp_2d"):
"""Create from `~astropy.io.fits.HDUList`."""
return cls.from_table(Table.read(hdulist[hdu]))
[docs] @classmethod
def read(cls, filename, hdu="edisp_2d"):
"""Read from FITS file.
Parameters
----------
filename : str
File name
"""
with fits.open(make_path(filename), memmap=False) as hdulist:
return cls.from_hdulist(hdulist, hdu)
[docs] def to_energy_dispersion(self, offset, e_true=None, e_reco=None):
"""Detector response R(Delta E_reco, Delta E_true)
Probability to reconstruct an energy in a given true energy band
in a given reconstructed energy band
Parameters
----------
offset : `~astropy.coordinates.Angle`
Offset
e_true : `~astropy.units.Quantity`, None
True energy axis
e_reco : `~astropy.units.Quantity`
Reconstructed energy axis
Returns
-------
edisp : `~gammapy.irf.EnergyDispersion`
Energy dispersion matrix
"""
offset = Angle(offset)
e_true = self.data.axis("energy_true").edges if e_true is None else e_true
e_reco = self.data.axis("energy_true").edges if e_reco is None else e_reco
data = []
for energy in MapAxis.from_edges(e_true, interp="log").center:
vec = self.get_response(offset=offset, e_true=energy, e_reco=e_reco)
data.append(vec)
data = np.asarray(data)
e_lo, e_hi = e_true[:-1], e_true[1:]
ereco_lo, ereco_hi = (e_reco[:-1], e_reco[1:])
return EDispKernel(
e_true_lo=e_lo,
e_true_hi=e_hi,
e_reco_lo=ereco_lo,
e_reco_hi=ereco_hi,
data=data,
)
[docs] def get_response(self, offset, e_true, e_reco=None):
"""Detector response R(Delta E_reco, E_true)
Probability to reconstruct a given true energy in a given reconstructed
energy band. In each reco bin, you integrate with a riemann sum over
the default migra bin of your analysis.
Parameters
----------
e_true : `~astropy.units.Quantity`
True energy
e_reco : `~astropy.units.Quantity`, None
Reconstructed energy axis
offset : `~astropy.coordinates.Angle`
Offset
Returns
-------
rv : `~numpy.ndarray`
Redistribution vector
"""
e_true = Quantity(e_true)
migra_axis = self.data.axis("migra")
if e_reco is None:
# Default: e_reco nodes = migra nodes * e_true nodes
e_reco = migra_axis.edges * e_true
else:
# Translate given e_reco binning to migra at bin center
e_reco = Quantity(e_reco)
# migration value of e_reco bounds
migra = e_reco / e_true
values = self.data.evaluate(
offset=offset, energy_true=e_true, migra=migra_axis.center
)
cumsum = np.insert(values, 0, 0).cumsum()
with np.errstate(invalid="ignore"):
cumsum = np.nan_to_num(cumsum / cumsum[-1])
f = interp1d(
migra_axis.edges.value,
cumsum,
kind="linear",
bounds_error=False,
fill_value=(0, 1),
)
# We compute the difference between 2 successive bounds in e_reco
# to get integral over reco energy bin
integral = np.diff(np.clip(f(migra), a_min=0, a_max=1))
return integral
[docs] def plot_migration(self, ax=None, offset=None, e_true=None, migra=None, **kwargs):
"""Plot energy dispersion for given offset and true energy.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Axis
offset : `~astropy.coordinates.Angle`, optional
Offset
e_true : `~astropy.units.Quantity`, optional
True energy
migra : `~numpy.ndarray`, optional
Migration nodes
Returns
-------
ax : `~matplotlib.axes.Axes`
Axis
"""
import matplotlib.pyplot as plt
ax = plt.gca() if ax is None else ax
if offset is None:
offset = Angle([1], "deg")
else:
offset = np.atleast_1d(Angle(offset))
if e_true is None:
e_true = Quantity([0.1, 1, 10], "TeV")
else:
e_true = np.atleast_1d(Quantity(e_true))
migra = self.data.axis("migra").center if migra is None else migra
for ener in e_true:
for off in offset:
disp = self.data.evaluate(offset=off, energy_true=ener, migra=migra)
label = f"offset = {off:.1f}\nenergy = {ener:.1f}"
ax.plot(migra, disp, label=label, **kwargs)
ax.set_xlabel(r"$E_\mathrm{{Reco}} / E_\mathrm{{True}}$")
ax.set_ylabel("Probability density")
ax.legend(loc="upper left")
return ax
[docs] def plot_bias(self, ax=None, offset=None, add_cbar=False, **kwargs):
"""Plot migration as a function of true energy for a given offset.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Axis
offset : `~astropy.coordinates.Angle`, optional
Offset
add_cbar : bool
Add a colorbar to the plot.
kwargs : dict
Keyword arguments passed to `~matplotlib.pyplot.pcolormesh`.
Returns
-------
ax : `~matplotlib.axes.Axes`
Axis
"""
from matplotlib.colors import PowerNorm
import matplotlib.pyplot as plt
kwargs.setdefault("cmap", "GnBu")
kwargs.setdefault("norm", PowerNorm(gamma=0.5))
ax = plt.gca() if ax is None else ax
if offset is None:
offset = Angle(1, "deg")
e_true = self.data.axis("energy_true")
migra = self.data.axis("migra")
x = e_true.edges.value
y = migra.edges.value
z = self.data.evaluate(
offset=offset,
energy_true=e_true.center.reshape(1, -1, 1),
migra=migra.center.reshape(1, 1, -1),
).value[0]
caxes = ax.pcolormesh(x, y, z.T, **kwargs)
if add_cbar:
label = "Probability density (A.U.)"
ax.figure.colorbar(caxes, ax=ax, label=label)
ax.set_xlabel(fr"$E_\mathrm{{True}}$ [{e_true.unit}]")
ax.set_ylabel(r"$E_\mathrm{{Reco}} / E_\mathrm{{True}}$")
ax.set_xlim(x.min(), x.max())
ax.set_ylim(y.min(), y.max())
ax.set_xscale("log")
return ax
[docs] def peek(self, figsize=(15, 5)):
"""Quick-look summary plots.
Parameters
----------
figsize : (float, float)
Size of the resulting plot
"""
import matplotlib.pyplot as plt
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=figsize)
self.plot_bias(ax=axes[0])
self.plot_migration(ax=axes[1])
edisp = self.to_energy_dispersion(offset="1 deg")
edisp.plot_matrix(ax=axes[2])
plt.tight_layout()
[docs] def to_table(self):
"""Convert to `~astropy.table.Table`."""
meta = self.meta.copy()
energy = self.data.axis("energy_true").edges
migra = self.data.axis("migra").edges
theta = self.data.axis("offset").edges
table = Table(meta=meta)
table["ENERG_LO"] = energy[:-1][np.newaxis]
table["ENERG_HI"] = energy[1:][np.newaxis]
table["MIGRA_LO"] = migra[:-1][np.newaxis]
table["MIGRA_HI"] = migra[1:][np.newaxis]
table["THETA_LO"] = theta[:-1][np.newaxis]
table["THETA_HI"] = theta[1:][np.newaxis]
table["MATRIX"] = self.data.data.T[np.newaxis]
return table
[docs] def to_fits(self, name="ENERGY DISPERSION"):
"""Convert to `~astropy.io.fits.BinTable`."""
return fits.BinTableHDU(self.to_table(), name=name)