# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
from astropy.io import fits
from astropy.table import Table
from astropy.units import Quantity
from gammapy.maps import MapAxis
from gammapy.utils.nddata import NDDataArray
from gammapy.utils.scripts import make_path
__all__ = ["EDispKernel"]
[docs]class EDispKernel:
"""Energy dispersion matrix.
Data format specification: :ref:`gadf:ogip-rmf`
Parameters
----------
energy_axis_true : `~gammapy.maps.MapAxis`
True energy axis. Its name must be "energy_true"
energy_axis : `~gammapy.maps.MapAxis`
Reconstructed energy axis. Its name must be "energy"
data : array_like
2-dim energy dispersion matrix
Examples
--------
Create a Gaussian energy dispersion matrix::
from gammapy.maps import MapAxis
from gammapy.irf import EDispKernel
energy = MapAxis.from_energy_bounds(0.1,10,10, unit='TeV')
energy_true = MapAxis.from_energy_bounds(0.1,10,10, unit='TeV', name='energy_true')
edisp = EDispKernel.from_gauss(
energy_true=energy, energy=energy,
sigma=0.1, bias=0,
)
Have a quick look:
>>> print(edisp)
>>> edisp.peek()
"""
default_interp_kwargs = dict(bounds_error=False, fill_value=0, method="nearest")
"""Default Interpolation kwargs for `~NDDataArray`. Fill zeros and do not
interpolate"""
def __init__(
self, energy_axis_true, energy_axis, data, interp_kwargs=None, meta=None,
):
if interp_kwargs is None:
interp_kwargs = self.default_interp_kwargs
self.data = NDDataArray(
axes=[energy_axis_true, energy_axis], data=data, interp_kwargs=interp_kwargs
)
self.meta = meta or {}
def __str__(self):
ss = self.__class__.__name__
ss += f"\n{self.data}"
return ss
@property
def energy_axis(self):
"""Reconstructed energy axis (`~gammapy.maps.MapAxis`)"""
return self.data.axes["energy"]
@property
def energy_axis_true(self):
"""True energy axis (`~gammapy.maps.MapAxis`)"""
return self.data.axes["energy_true"]
@property
def pdf_matrix(self):
"""Energy dispersion PDF matrix (`~numpy.ndarray`).
Rows (first index): True Energy
Columns (second index): Reco Energy
"""
return self.data.data.value
[docs] def pdf_in_safe_range(self, lo_threshold, hi_threshold):
"""PDF matrix with bins outside threshold set to 0.
Parameters
----------
lo_threshold : `~astropy.units.Quantity`
Low reco energy threshold
hi_threshold : `~astropy.units.Quantity`
High reco energy threshold
"""
data = self.pdf_matrix.copy()
energy = self.energy_axis.edges
if lo_threshold is None and hi_threshold is None:
idx = slice(None)
else:
idx = (energy[:-1] < lo_threshold) | (energy[1:] > hi_threshold)
data[:, idx] = 0
return data
[docs] def to_image(self, lo_threshold=None, hi_threshold=None):
"""Return a 2D edisp by summing the pdf matrix over the ereco axis.
Parameters
----------
lo_threshold :`~astropy.units.Quantity`, optional
Low reco energy threshold
hi_threshold : `~astropy.units.Quantity`, optional
High reco energy threshold
"""
lo_threshold = lo_threshold or self.energy_axis.edges[0]
hi_threshold = hi_threshold or self.energy_axis.edges[-1]
data = self.pdf_in_safe_range(lo_threshold, hi_threshold)
return self.__class__(
energy_axis=self.energy_axis.squash(),
energy_axis_true=self.energy_axis_true,
data=np.sum(data, axis=1, keepdims=True),
)
[docs] @classmethod
def from_gauss(cls, energy_true, energy, sigma, bias, pdf_threshold=1e-6):
"""Create Gaussian energy dispersion matrix (`EnergyDispersion`).
Calls :func:`gammapy.irf.EnergyDispersion2D.from_gauss`
Parameters
----------
energy_true : `~astropy.units.Quantity`
Bin edges of true energy axis
energy : `~astropy.units.Quantity`
Bin edges of reconstructed energy axis
bias : float or `~numpy.ndarray`
Center of Gaussian energy dispersion, bias
sigma : float or `~numpy.ndarray`
RMS width of Gaussian energy dispersion, resolution
pdf_threshold : float, optional
Zero suppression threshold
"""
from .energy_dispersion import EnergyDispersion2D
migra = np.linspace(1.0 / 3, 3, 200)
# A dummy offset axis (need length 2 for interpolation to work)
offset = Quantity([0, 1, 2], "deg")
edisp = EnergyDispersion2D.from_gauss(
energy_true=energy_true,
migra=migra,
sigma=sigma,
bias=bias,
offset=offset,
pdf_threshold=pdf_threshold,
)
return edisp.to_edisp_kernel(offset=offset[0], energy=energy)
[docs] @classmethod
def from_diagonal_response(cls, energy_true, energy=None):
"""Create energy dispersion from a diagonal response, i.e. perfect energy resolution
This creates the matrix corresponding to a perfect energy response.
It contains ones where the energy_true center is inside the e_reco bin.
It is a square diagonal matrix if energy_true = e_reco.
This is useful in cases where code always applies an edisp,
but you don't want it to do anything.
Parameters
----------
energy_true, energy : `~astropy.units.Quantity`
Energy edges for true and reconstructed energy axis
Examples
--------
If ``energy_true`` equals ``energy``, you get a diagonal matrix::
energy_true = [0.5, 1, 2, 4, 6] * u.TeV
edisp = EnergyDispersion.from_diagonal_response(energy_true)
edisp.plot_matrix()
Example with different energy binnings::
energy_true = [0.5, 1, 2, 4, 6] * u.TeV
energy = [2, 4, 6] * u.TeV
edisp = EnergyDispersion.from_diagonal_response(energy_true, energy)
edisp.plot_matrix()
"""
from .edisp_map import get_overlap_fraction
if energy is None:
energy = energy_true
energy_axis = MapAxis.from_energy_edges(energy)
energy_axis_true = MapAxis.from_energy_edges(energy_true, name="energy_true")
data = get_overlap_fraction(energy_axis, energy_axis_true)
return cls(
energy_axis=energy_axis, energy_axis_true=energy_axis_true, data=data,
)
[docs] @classmethod
def from_hdulist(cls, hdulist, hdu1="MATRIX", hdu2="EBOUNDS"):
"""Create `EnergyDispersion` object from `~astropy.io.fits.HDUList`.
Parameters
----------
hdulist : `~astropy.io.fits.HDUList`
HDU list with ``MATRIX`` and ``EBOUNDS`` extensions.
hdu1 : str, optional
HDU containing the energy dispersion matrix, default: MATRIX
hdu2 : str, optional
HDU containing the energy axis information, default, EBOUNDS
"""
matrix_hdu = hdulist[hdu1]
ebounds_hdu = hdulist[hdu2]
data = matrix_hdu.data
header = matrix_hdu.header
pdf_matrix = np.zeros([len(data), header["DETCHANS"]], dtype=np.float64)
for i, l in enumerate(data):
if l.field("N_GRP"):
m_start = 0
for k in range(l.field("N_GRP")):
pdf_matrix[
i,
l.field("F_CHAN")[k] : l.field("F_CHAN")[k]
+ l.field("N_CHAN")[k],
] = l.field("MATRIX")[m_start : m_start + l.field("N_CHAN")[k]]
m_start += l.field("N_CHAN")[k]
table = Table.read(ebounds_hdu)
energy_axis = MapAxis.from_table(table, format="ogip")
table = Table.read(matrix_hdu)
energy_axis_true = MapAxis.from_table(table, format="ogip-arf")
return cls(
energy_axis=energy_axis, energy_axis_true=energy_axis_true, data=pdf_matrix,
)
[docs] @classmethod
def read(cls, filename, hdu1="MATRIX", hdu2="EBOUNDS"):
"""Read from file.
Parameters
----------
filename : `pathlib.Path`, str
File to read
hdu1 : str, optional
HDU containing the energy dispersion matrix, default: MATRIX
hdu2 : str, optional
HDU containing the energy axis information, default, EBOUNDS
"""
with fits.open(str(make_path(filename)), memmap=False) as hdulist:
return cls.from_hdulist(hdulist, hdu1=hdu1, hdu2=hdu2)
[docs] def to_hdulist(self, use_sherpa=False, **kwargs):
"""Convert RMF to FITS HDU list format.
Parameters
----------
header : `~astropy.io.fits.Header`
Header to be written in the fits file.
energy_unit : str
Unit in which the energy is written in the HDU list
Returns
-------
hdulist : `~astropy.io.fits.HDUList`
RMF in HDU list format.
Notes
-----
For more info on the RMF FITS file format see:
https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/summary/cal_gen_92_002_summary.html
"""
# Cannot use table_to_fits here due to variable length array
# http://docs.astropy.org/en/v1.0.4/io/fits/usage/unfamiliar.html
table = self.to_table()
name = table.meta.pop("name")
header = fits.Header()
header.update(table.meta)
if use_sherpa:
table["ENERG_HI"] = table["ENERG_HI"].quantity.to("keV")
table["ENERG_LO"] = table["ENERG_LO"].quantity.to("keV")
cols = table.columns
c0 = fits.Column(
name=cols[0].name, format="E", array=cols[0], unit=str(cols[0].unit)
)
c1 = fits.Column(
name=cols[1].name, format="E", array=cols[1], unit=str(cols[1].unit)
)
c2 = fits.Column(name=cols[2].name, format="I", array=cols[2])
c3 = fits.Column(name=cols[3].name, format="PI()", array=cols[3])
c4 = fits.Column(name=cols[4].name, format="PI()", array=cols[4])
c5 = fits.Column(name=cols[5].name, format="PE()", array=cols[5])
hdu = fits.BinTableHDU.from_columns(
[c0, c1, c2, c3, c4, c5], header=header, name=name
)
hdu_format = "ogip-sherpa" if use_sherpa else "ogip"
ebounds_hdu = self.energy_axis.to_table_hdu(format=hdu_format)
prim_hdu = fits.PrimaryHDU()
return fits.HDUList([prim_hdu, hdu, ebounds_hdu])
[docs] def to_table(self):
"""Convert to `~astropy.table.Table`.
The output table is in the OGIP RMF format.
https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html#Tab:1
"""
rows = self.pdf_matrix.shape[0]
n_grp = []
f_chan = np.ndarray(dtype=np.object, shape=rows)
n_chan = np.ndarray(dtype=np.object, shape=rows)
matrix = np.ndarray(dtype=np.object, shape=rows)
# Make RMF type matrix
for i, row in enumerate(self.data.data.value):
pos = np.nonzero(row)[0]
borders = np.where(np.diff(pos) != 1)[0]
# add 1 to borders for correct behaviour of np.split
groups = np.asarray(np.split(pos, borders + 1))
n_grp_temp = groups.shape[0] if groups.size > 0 else 1
n_chan_temp = np.asarray([val.size for val in groups])
try:
f_chan_temp = np.asarray([val[0] for val in groups])
except IndexError:
f_chan_temp = np.zeros(1)
n_grp.append(n_grp_temp)
f_chan[i] = f_chan_temp
n_chan[i] = n_chan_temp
matrix[i] = row[pos]
n_grp = np.asarray(n_grp, dtype=np.int16)
# Get total number of groups and channel subsets
numgrp, numelt = 0, 0
for val, val2 in zip(n_grp, n_chan):
numgrp += np.sum(val)
numelt += np.sum(val2)
table = Table()
energy = self.energy_axis_true.edges
table["ENERG_LO"] = energy[:-1]
table["ENERG_HI"] = energy[1:]
table["N_GRP"] = n_grp
table["F_CHAN"] = f_chan
table["N_CHAN"] = n_chan
table["MATRIX"] = matrix
table.meta = {
"name": "MATRIX",
"chantype": "PHA",
"hduclass": "OGIP",
"hduclas1": "RESPONSE",
"hduclas2": "RSP_MATRIX",
"detchans": self.energy_axis.nbin,
"numgrp": numgrp,
"numelt": numelt,
"tlmin4": 0,
}
return table
[docs] def write(self, filename, use_sherpa=False, **kwargs):
"""Write to file."""
filename = str(make_path(filename))
self.to_hdulist(use_sherpa=use_sherpa).writeto(filename, **kwargs)
[docs] def get_resolution(self, energy_true):
"""Get energy resolution for a given true energy.
The resolution is given as a percentage of the true energy
Parameters
----------
energy_true : `~astropy.units.Quantity`
True energy
"""
var = self._get_variance(energy_true)
idx_true = self.energy_axis_true.coord_to_idx(energy_true)
energy_true_real = self.energy_axis_true.center[idx_true]
return np.sqrt(var) / energy_true_real
[docs] def get_bias(self, energy_true):
r"""Get reconstruction bias for a given true energy.
Bias is defined as
.. math:: \frac{E_{reco}-E_{true}}{E_{true}}
Parameters
----------
energy_true : `~astropy.units.Quantity`
True energy
"""
energy = self.get_mean(energy_true)
idx_true = self.energy_axis_true.coord_to_idx(energy_true)
energy_true_real = self.energy_axis_true.center[idx_true]
bias = (energy - energy_true_real) / energy_true_real
return bias
[docs] def get_bias_energy(self, bias, energy_min=None, energy_max=None):
"""Find energy corresponding to a given bias.
In case the solution is not unique, provide the ``energy_min`` or ``energy_max`` arguments
to limit the solution to the given range. By default the peak energy of the
bias is chosen as ``energy_min``.
Parameters
----------
bias : float
Bias value.
energy_min : `~astropy.units.Quantity`
Lower bracket value in case solution is not unique.
energy_max : `~astropy.units.Quantity`
Upper bracket value in case solution is not unique.
Returns
-------
bias_energy : `~astropy.units.Quantity`
Reconstructed energy corresponding to the given bias.
"""
from gammapy.modeling.models import TemplateSpectralModel
energy_true = self.energy_axis_true.center
values = self.get_bias(energy_true)
if energy_min is None:
# use the peak bias energy as default minimum
energy_min = energy_true[np.nanargmax(values)]
if energy_max is None:
energy_max = energy_true[-1]
bias_spectrum = TemplateSpectralModel(energy_true, values)
energy_true_bias = bias_spectrum.inverse(
Quantity(bias), energy_min=energy_min, energy_max=energy_max
)
# return reconstructed energy
return energy_true_bias * (1 + bias)
[docs] def get_mean(self, energy_true):
"""Get mean reconstructed energy for a given true energy."""
# find pdf for true energies
idx = self.energy_axis_true.coord_to_idx(energy_true)
pdf = self.data.data[idx]
# compute sum along reconstructed energy
# axis to determine the mean
norm = np.sum(pdf, axis=-1)
temp = np.sum(pdf * self.energy_axis.center, axis=-1)
with np.errstate(invalid="ignore"):
# corm can be zero
mean = np.nan_to_num(temp / norm)
return mean
def _get_variance(self, energy_true):
"""Get variance of log reconstructed energy."""
# evaluate the pdf at given true energies
idx = self.energy_axis_true.coord_to_idx(energy_true)
pdf = self.data.data[idx]
# compute mean
mean = self.get_mean(energy_true)
# create array of reconstructed-energy nodes
# for each given true energy value
# (first axis is reconstructed energy)
erec = self.energy_axis.center
erec = np.repeat(erec, max(np.sum(mean.shape), 1)).reshape(
erec.shape + mean.shape
)
# compute deviation from mean
# (and move reconstructed energy axis to last axis)
temp_ = (erec - mean) ** 2
temp = np.rollaxis(temp_, 1)
# compute sum along reconstructed energy
# axis to determine the variance
norm = np.sum(pdf, axis=-1)
var = np.sum(temp * pdf, axis=-1)
return var / norm
[docs] def plot_matrix(self, ax=None, show_energy=None, add_cbar=False, **kwargs):
"""Plot PDF matrix.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Axis
show_energy : `~astropy.units.Quantity`, optional
Show energy, e.g. threshold, as vertical line
add_cbar : bool
Add a colorbar to the plot.
Returns
-------
ax : `~matplotlib.axes.Axes`
Axis
"""
import matplotlib.pyplot as plt
from matplotlib.colors import PowerNorm
kwargs.setdefault("cmap", "GnBu")
norm = PowerNorm(gamma=0.5)
kwargs.setdefault("norm", norm)
ax = plt.gca() if ax is None else ax
energy_true = self.energy_axis_true.edges
energy = self.energy_axis.edges
x = energy_true.value
y = energy.value
z = self.pdf_matrix
caxes = ax.pcolormesh(x, y, z.T, **kwargs)
if show_energy is not None:
ener_val = show_energy.to_value(self.energy_axis.unit)
ax.hlines(ener_val, 0, 200200, linestyles="dashed")
if add_cbar:
label = "Probability density (A.U.)"
cbar = ax.figure.colorbar(caxes, ax=ax, label=label)
ax.set_xlabel(fr"$E_\mathrm{{True}}$ [{energy_true.unit}]")
ax.set_ylabel(fr"$E_\mathrm{{Reco}}$ [{energy.unit}]")
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlim(x.min(), x.max())
ax.set_ylim(y.min(), y.max())
return ax
[docs] def plot_bias(self, ax=None, **kwargs):
"""Plot reconstruction bias.
See `~gammapy.irf.EnergyDispersion.get_bias` method.
Parameters
----------
ax : `~matplotlib.axes.Axes`, optional
Axis
"""
import matplotlib.pyplot as plt
ax = plt.gca() if ax is None else ax
x = self.energy_axis_true.center.to_value("TeV")
y = self.get_bias(self.energy_axis_true.center)
ax.plot(x, y, **kwargs)
ax.set_xlabel(r"$E_\mathrm{{True}}$ [TeV]")
ax.set_ylabel(r"($E_\mathrm{{Reco}} - E_\mathrm{{True}}) / E_\mathrm{{True}}$")
ax.set_xscale("log")
return ax
[docs] def peek(self, figsize=(15, 5)):
"""Quick-look summary plot."""
import matplotlib.pyplot as plt
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=figsize)
self.plot_bias(ax=axes[0])
self.plot_matrix(ax=axes[1])
plt.tight_layout()