# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
from astropy.io import fits
from astropy.table import Table
from astropy.units import Quantity
from astropy.visualization import quantity_support
import matplotlib.pyplot as plt
from matplotlib.colors import PowerNorm
from gammapy.maps import MapAxis
from gammapy.utils.scripts import make_path
from ..core import IRF
__all__ = ["EDispKernel"]
[docs]class EDispKernel(IRF):
"""Energy dispersion matrix.
Data format specification: :ref:`gadf:ogip-rmf`
Parameters
----------
energy_axis_true : `~gammapy.maps.MapAxis`
True energy axis. Its name must be "energy_true"
energy_axis : `~gammapy.maps.MapAxis`
Reconstructed energy axis. Its name must be "energy"
data : array_like
2-dim energy dispersion matrix
Examples
--------
Create a Gaussian energy dispersion matrix::
>>> from gammapy.maps import MapAxis
>>> from gammapy.irf import EDispKernel
>>> energy = MapAxis.from_energy_bounds(0.1,10,10, unit='TeV')
>>> energy_true = MapAxis.from_energy_bounds(0.1,10,10, unit='TeV', name='energy_true')
>>> edisp = EDispKernel.from_gauss(
>>> energy_axis_true=energy_true, energy_axis=energy, sigma=0.1, bias=0
>>> )
Have a quick look:
>>> print(edisp)
EDispKernel
-----------
<BLANKLINE>
axes : ['energy_true', 'energy']
shape : (10, 10)
ndim : 2
unit :
dtype : float64
<BLANKLINE>
>>> edisp.peek()
"""
tag = "edisp_kernel"
required_axes = ["energy_true", "energy"]
default_interp_kwargs = dict(bounds_error=False, fill_value=0, method="nearest")
"""Default Interpolation kwargs for `~IRF`. Fill zeros and do not
interpolate"""
@property
def pdf_matrix(self):
"""Energy dispersion PDF matrix (`~numpy.ndarray`).
Rows (first index): True Energy
Columns (second index): Reco Energy
"""
return self.data
[docs] def pdf_in_safe_range(self, lo_threshold, hi_threshold):
"""PDF matrix with bins outside threshold set to 0.
Parameters
----------
lo_threshold : `~astropy.units.Quantity`
Low reco energy threshold
hi_threshold : `~astropy.units.Quantity`
High reco energy threshold
"""
data = self.pdf_matrix.copy()
energy = self.axes["energy"].edges
if lo_threshold is None and hi_threshold is None:
idx = slice(None)
else:
idx = (energy[:-1] < lo_threshold) | (energy[1:] > hi_threshold)
data[:, idx] = 0
return data
[docs] def to_image(self, lo_threshold=None, hi_threshold=None):
"""Return a 2D edisp by summing the pdf matrix over the ereco axis.
Parameters
----------
lo_threshold : `~astropy.units.Quantity`, optional
Low reco energy threshold
hi_threshold : `~astropy.units.Quantity`, optional
High reco energy threshold
"""
energy_axis = self.axes["energy"]
lo_threshold = lo_threshold or energy_axis.edges[0]
hi_threshold = hi_threshold or energy_axis.edges[-1]
data = self.pdf_in_safe_range(lo_threshold, hi_threshold)
return self.__class__(
axes=self.axes.squash("energy"),
data=np.sum(data, axis=1, keepdims=True),
)
[docs] @classmethod
def from_gauss(cls, energy_axis_true, energy_axis, sigma, bias, pdf_threshold=1e-6):
"""Create Gaussian energy dispersion matrix (`EnergyDispersion`).
Calls :func:`gammapy.irf.EnergyDispersion2D.from_gauss`
Parameters
----------
energy_axis_true : `~astropy.units.Quantity`
Bin edges of true energy axis
energy_axis : `~astropy.units.Quantity`
Bin edges of reconstructed energy axis
bias : float or `~numpy.ndarray`
Center of Gaussian energy dispersion, bias
sigma : float or `~numpy.ndarray`
RMS width of Gaussian energy dispersion, resolution
pdf_threshold : float, optional
Zero suppression threshold
Returns
-------
edisp : `EDispKernel`
Edisp kernel.
"""
from .core import EnergyDispersion2D
migra_axis = MapAxis.from_bounds(1.0 / 3, 3, nbin=200, name="migra")
# A dummy offset axis (need length 2 for interpolation to work)
offset_axis = MapAxis.from_edges([0, 1, 2], unit="deg", name="offset")
edisp = EnergyDispersion2D.from_gauss(
energy_axis_true=energy_axis_true,
migra_axis=migra_axis,
offset_axis=offset_axis,
sigma=sigma,
bias=bias,
pdf_threshold=pdf_threshold,
)
return edisp.to_edisp_kernel(
offset=offset_axis.center[0], energy=energy_axis.edges
)
[docs] @classmethod
def from_diagonal_response(cls, energy_axis_true, energy_axis=None):
"""Create energy dispersion from a diagonal response, i.e. perfect energy resolution
This creates the matrix corresponding to a perfect energy response.
It contains ones where the energy_true center is inside the e_reco bin.
It is a square diagonal matrix if energy_true = e_reco.
This is useful in cases where code always applies an edisp,
but you don't want it to do anything.
Parameters
----------
energy_axis_true, energy_axis : `MapAxis`
True and reconstructed energy axis
Examples
--------
If ``energy_true`` equals ``energy``, you get a diagonal matrix::
from gammapy.irf import EDispKernel
from gammapy.maps import MapAxis
energy_true_axis = MapAxis.from_energy_edges(
[0.5, 1, 2, 4, 6] * u.TeV, name="energy_true"
)
edisp = EDispKernel.from_diagonal_response(energy_true_axis)
edisp.plot_matrix()
Example with different energy binnings::
energy_true_axis = MapAxis.from_energy_edges(
[0.5, 1, 2, 4, 6] * u.TeV, name="energy_true"
)
energy_axis = MapAxis.from_energy_edges([2, 4, 6] * u.TeV)
edisp = EDispKernel.from_diagonal_response(energy_true_axis, energy_axis)
edisp.plot_matrix()
"""
from .map import get_overlap_fraction
energy_axis_true.assert_name("energy_true")
if energy_axis is None:
energy_axis = energy_axis_true.copy(name="energy")
data = get_overlap_fraction(energy_axis, energy_axis_true)
return cls(axes=[energy_axis_true, energy_axis], data=data.value)
[docs] @classmethod
def from_hdulist(cls, hdulist, hdu1="MATRIX", hdu2="EBOUNDS"):
"""Create `EnergyDispersion` object from `~astropy.io.fits.HDUList`.
Parameters
----------
hdulist : `~astropy.io.fits.HDUList`
HDU list with ``MATRIX`` and ``EBOUNDS`` extensions.
hdu1 : str, optional
HDU containing the energy dispersion matrix, default: MATRIX
hdu2 : str, optional
HDU containing the energy axis information, default, EBOUNDS
"""
matrix_hdu = hdulist[hdu1]
ebounds_hdu = hdulist[hdu2]
data = matrix_hdu.data
header = matrix_hdu.header
pdf_matrix = np.zeros([len(data), header["DETCHANS"]], dtype=np.float64)
for i, l in enumerate(data):
if l.field("N_GRP"):
m_start = 0
for k in range(l.field("N_GRP")):
chan_min = l.field("F_CHAN")[k]
chan_max = l.field("F_CHAN")[k] + l.field("N_CHAN")[k]
pdf_matrix[i, chan_min:chan_max] = l.field("MATRIX")[
m_start : m_start + l.field("N_CHAN")[k] # noqa: E203
]
m_start += l.field("N_CHAN")[k]
table = Table.read(ebounds_hdu)
energy_axis = MapAxis.from_table(table, format="ogip")
table = Table.read(matrix_hdu)
energy_axis_true = MapAxis.from_table(table, format="ogip-arf")
return cls(axes=[energy_axis_true, energy_axis], data=pdf_matrix)
[docs] @classmethod
def read(cls, filename, hdu1="MATRIX", hdu2="EBOUNDS"):
"""Read from file.
Parameters
----------
filename : `pathlib.Path`, str
File to read
hdu1 : str, optional
HDU containing the energy dispersion matrix, default: MATRIX
hdu2 : str, optional
HDU containing the energy axis information, default, EBOUNDS
"""
with fits.open(str(make_path(filename)), memmap=False) as hdulist:
return cls.from_hdulist(hdulist, hdu1=hdu1, hdu2=hdu2)
[docs] def to_hdulist(self, format="ogip", **kwargs):
"""Convert RMF to FITS HDU list format.
Parameters
----------
format : {"ogip", "ogip-sherpa"}
Format to use.
Returns
-------
hdulist : `~astropy.io.fits.HDUList`
RMF in HDU list format.
Notes
-----
For more info on the RMF FITS file format see:
https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/summary/cal_gen_92_002_summary.html
"""
# Cannot use table_to_fits here due to variable length array
# http://docs.astropy.org/en/v1.0.4/io/fits/usage/unfamiliar.html
format_arf = format.replace("ogip", "ogip-arf")
table = self.to_table(format=format_arf)
name = table.meta.pop("name")
header = fits.Header()
header.update(table.meta)
cols = table.columns
c0 = fits.Column(
name=cols[0].name, format="E", array=cols[0], unit=str(cols[0].unit)
)
c1 = fits.Column(
name=cols[1].name, format="E", array=cols[1], unit=str(cols[1].unit)
)
c2 = fits.Column(name=cols[2].name, format="I", array=cols[2])
c3 = fits.Column(name=cols[3].name, format="PI()", array=cols[3])
c4 = fits.Column(name=cols[4].name, format="PI()", array=cols[4])
c5 = fits.Column(name=cols[5].name, format="PE()", array=cols[5])
hdu = fits.BinTableHDU.from_columns(
[c0, c1, c2, c3, c4, c5], header=header, name=name
)
ebounds_hdu = self.axes["energy"].to_table_hdu(format=format)
prim_hdu = fits.PrimaryHDU()
return fits.HDUList([prim_hdu, hdu, ebounds_hdu])
[docs] def to_table(self, format="ogip"):
"""Convert to `~astropy.table.Table`.
The output table is in the OGIP RMF format.
https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html#Tab:1 # noqa: E501
Parameters
----------
format : {"ogip", "ogip-sherpa"}
Format to use.
Returns
-------
table : `~astropy.table.Table`
Matrix table
"""
table = self.axes["energy_true"].to_table(format=format)
rows = self.pdf_matrix.shape[0]
n_grp = []
f_chan = np.ndarray(dtype=np.object, shape=rows)
n_chan = np.ndarray(dtype=np.object, shape=rows)
matrix = np.ndarray(dtype=np.object, shape=rows)
# Make RMF type matrix
for idx, row in enumerate(self.data):
pos = np.nonzero(row)[0]
borders = np.where(np.diff(pos) != 1)[0]
# add 1 to borders for correct behaviour of np.split
groups = np.split(pos, borders + 1)
n_grp_temp = len(groups) if len(groups) > 0 else 1
n_chan_temp = np.asarray([val.size for val in groups])
try:
f_chan_temp = np.asarray([val[0] for val in groups])
except IndexError:
f_chan_temp = np.zeros(1)
n_grp.append(n_grp_temp)
f_chan[idx] = f_chan_temp
n_chan[idx] = n_chan_temp
matrix[idx] = row[pos]
n_grp = np.asarray(n_grp, dtype=np.int16)
# Get total number of groups and channel subsets
numgrp, numelt = 0, 0
for val, val2 in zip(n_grp, n_chan):
numgrp += np.sum(val)
numelt += np.sum(val2)
table["N_GRP"] = n_grp
table["F_CHAN"] = f_chan
table["N_CHAN"] = n_chan
table["MATRIX"] = matrix
table.meta = {
"name": "MATRIX",
"chantype": "PHA",
"hduclass": "OGIP",
"hduclas1": "RESPONSE",
"hduclas2": "RSP_MATRIX",
"detchans": self.axes["energy"].nbin,
"numgrp": numgrp,
"numelt": numelt,
"tlmin4": 0,
}
return table
[docs] def write(self, filename, format="ogip", **kwargs):
"""Write to file.
Parameters
----------
filename : str
Filename
format : {"ogip", "ogip-sherpa"}
Format to use.
"""
filename = str(make_path(filename))
self.to_hdulist(format=format).writeto(filename, **kwargs)
[docs] def get_resolution(self, energy_true):
"""Get energy resolution for a given true energy.
The resolution is given as a percentage of the true energy
Parameters
----------
energy_true : `~astropy.units.Quantity`
True energy
"""
energy_axis_true = self.axes["energy_true"]
var = self._get_variance(energy_true)
idx_true = energy_axis_true.coord_to_idx(energy_true)
energy_true_real = energy_axis_true.center[idx_true]
return np.sqrt(var) / energy_true_real
[docs] def get_bias(self, energy_true):
r"""Get reconstruction bias for a given true energy.
Bias is defined as
.. math:: \frac{E_{reco}-E_{true}}{E_{true}}
Parameters
----------
energy_true : `~astropy.units.Quantity`
True energy
"""
energy_axis_true = self.axes["energy_true"]
energy = self.get_mean(energy_true)
idx_true = energy_axis_true.coord_to_idx(energy_true)
energy_true_real = energy_axis_true.center[idx_true]
bias = (energy - energy_true_real) / energy_true_real
return bias
[docs] def get_bias_energy(self, bias, energy_min=None, energy_max=None):
"""Find energy corresponding to a given bias.
In case the solution is not unique, provide the ``energy_min`` or ``energy_max`` arguments
to limit the solution to the given range. By default the peak energy of the
bias is chosen as ``energy_min``.
Parameters
----------
bias : float
Bias value.
energy_min : `~astropy.units.Quantity`
Lower bracket value in case solution is not unique.
energy_max : `~astropy.units.Quantity`
Upper bracket value in case solution is not unique.
Returns
-------
bias_energy : `~astropy.units.Quantity`
Reconstructed energy corresponding to the given bias.
"""
from gammapy.modeling.models import TemplateSpectralModel
energy_true = self.axes["energy_true"].center
values = self.get_bias(energy_true)
if energy_min is None:
# use the peak bias energy as default minimum
energy_min = energy_true[np.nanargmax(values)]
if energy_max is None:
energy_max = energy_true[-1]
bias_spectrum = TemplateSpectralModel(energy=energy_true, values=values)
energy_true_bias = bias_spectrum.inverse(
Quantity(bias), energy_min=energy_min, energy_max=energy_max
)
if np.isnan(energy_true_bias[0]):
energy_true_bias[0] = energy_min
# return reconstructed energy
return energy_true_bias * (1 + bias)
[docs] def get_mean(self, energy_true):
"""Get mean reconstructed energy for a given true energy."""
idx = self.axes["energy_true"].coord_to_idx(energy_true)
pdf = self.data[idx]
# compute sum along reconstructed energy
norm = np.sum(pdf, axis=-1)
temp = np.sum(pdf * self.axes["energy"].center, axis=-1)
with np.errstate(invalid="ignore"):
# corm can be zero
mean = np.nan_to_num(temp / norm)
return mean
def _get_variance(self, energy_true):
"""Get variance of log reconstructed energy."""
# evaluate the pdf at given true energies
idx = self.axes["energy_true"].coord_to_idx(energy_true)
pdf = self.data[idx]
# compute mean
mean = self.get_mean(energy_true)
# create array of reconstructed-energy nodes
# for each given true energy value
# (first axis is reconstructed energy)
erec = self.axes["energy"].center
erec = np.repeat(erec, max(np.sum(mean.shape), 1)).reshape(
erec.shape + mean.shape
)
# compute deviation from mean
# (and move reconstructed energy axis to last axis)
temp_ = (erec - mean) ** 2
temp = np.rollaxis(temp_, 1)
# compute sum along reconstructed energy
# axis to determine the variance
norm = np.sum(pdf, axis=-1)
var = np.sum(temp * pdf, axis=-1)
return var / norm
[docs] def plot_matrix(self, ax=None, add_cbar=False, **kwargs):
"""Plot PDF matrix.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Axis
add_cbar : bool
Add a colorbar to the plot.
Returns
-------
ax : `~matplotlib.axes.Axes`
Axis
"""
kwargs.setdefault("cmap", "GnBu")
norm = PowerNorm(gamma=0.5, vmin=0, vmax=1)
kwargs.setdefault("norm", norm)
ax = plt.gca() if ax is None else ax
energy_axis_true = self.axes["energy_true"]
energy_axis = self.axes["energy"]
with quantity_support():
caxes = ax.pcolormesh(
energy_axis_true.edges, energy_axis.edges, self.data.T, **kwargs
)
if add_cbar:
label = "Probability density (A.U.)"
ax.figure.colorbar(caxes, ax=ax, label=label)
energy_axis_true.format_plot_xaxis(ax=ax)
energy_axis.format_plot_yaxis(ax=ax)
return ax
[docs] def plot_bias(self, ax=None, **kwargs):
"""Plot reconstruction bias.
See `~gammapy.irf.EnergyDispersion.get_bias` method.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Plot axis
**kwargs : dict
Kwyrow
Returns
-------
ax : `~matplotlib.axes.Axes`, optional
Plot axis
"""
ax = plt.gca() if ax is None else ax
energy = self.axes["energy_true"].center
bias = self.get_bias(energy)
with quantity_support():
ax.plot(energy, bias, **kwargs)
ax.set_xlabel(f"$E_\\mathrm{{True}}$ [{ax.yaxis.units}]")
ax.set_ylabel(
"($E_\\mathrm{{Reco}} - E_\\mathrm{{True}}) / E_\\mathrm{{True}}$"
)
ax.set_xscale("log")
return ax
[docs] def peek(self, figsize=(15, 5)):
"""Quick-look summary plots.
Parameters
----------
figsize : tuple
Size of the figure.
"""
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=figsize)
self.plot_bias(ax=axes[0])
self.plot_matrix(ax=axes[1])
plt.tight_layout()