Source code for gammapy.irf.edisp_kernel

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
from astropy.io import fits
from astropy.table import Table
from astropy.units import Quantity
from gammapy.maps import MapAxis
from gammapy.maps.utils import edges_from_lo_hi
from gammapy.utils.fits import energy_axis_to_ebounds
from gammapy.utils.nddata import NDDataArray
from gammapy.utils.scripts import make_path

__all__ = ["EDispKernel"]


[docs]class EDispKernel: """Energy dispersion matrix. Data format specification: :ref:`gadf:ogip-rmf` Parameters ---------- e_true_lo, e_true_hi : `~astropy.units.Quantity` True energy axis binning e_reco_lo, e_reco_hi : `~astropy.units.Quantity` Reconstructed energy axis binning data : array_like 2-dim energy dispersion matrix Examples -------- Create a Gaussian energy dispersion matrix:: import numpy as np import astropy.units as u from gammapy.irf import EDispKernel energy = np.logspace(0, 1, 101) * u.TeV edisp = EDispKernel.from_gauss( e_true=energy, e_reco=energy, sigma=0.1, bias=0, ) Have a quick look: >>> print(edisp) >>> edisp.peek() See Also -------- EnergyDispersion2D """ default_interp_kwargs = dict(bounds_error=False, fill_value=0, method="nearest") """Default Interpolation kwargs for `~NDDataArray`. Fill zeros and do not interpolate""" def __init__( self, e_true_lo, e_true_hi, e_reco_lo, e_reco_hi, data, interp_kwargs=None, meta=None, ): if interp_kwargs is None: interp_kwargs = self.default_interp_kwargs e_true_edges = edges_from_lo_hi(e_true_lo, e_true_hi) e_true_axis = MapAxis.from_edges(e_true_edges, interp="log", name="energy_true") e_reco_edges = edges_from_lo_hi(e_reco_lo, e_reco_hi) e_reco_axis = MapAxis.from_edges(e_reco_edges, interp="log", name="energy") self.data = NDDataArray( axes=[e_true_axis, e_reco_axis], data=data, interp_kwargs=interp_kwargs ) self.meta = meta or {} def __str__(self): ss = self.__class__.__name__ ss += f"\n{self.data}" return ss @property def e_reco(self): """Reconstructed energy axis (`~gammapy.maps.MapAxis`)""" return self.data.axis("energy") @property def e_true(self): """True energy axis (`~gammapy.maps.MapAxis`)""" return self.data.axis("energy_true") @property def pdf_matrix(self): """Energy dispersion PDF matrix (`~numpy.ndarray`). Rows (first index): True Energy Columns (second index): Reco Energy """ return self.data.data.value
[docs] def pdf_in_safe_range(self, lo_threshold, hi_threshold): """PDF matrix with bins outside threshold set to 0. Parameters ---------- lo_threshold : `~astropy.units.Quantity` Low reco energy threshold hi_threshold : `~astropy.units.Quantity` High reco energy threshold """ data = self.pdf_matrix.copy() energy = self.e_reco.edges if lo_threshold is None and hi_threshold is None: idx = slice(None) else: idx = (energy[:-1] < lo_threshold) | (energy[1:] > hi_threshold) data[:, idx] = 0 return data
[docs] @classmethod def from_gauss(cls, e_true, e_reco, sigma, bias, pdf_threshold=1e-6): """Create Gaussian energy dispersion matrix (`EnergyDispersion`). Calls :func:`gammapy.irf.EnergyDispersion2D.from_gauss` Parameters ---------- e_true : `~astropy.units.Quantity` Bin edges of true energy axis e_reco : `~astropy.units.Quantity` Bin edges of reconstructed energy axis bias : float or `~numpy.ndarray` Center of Gaussian energy dispersion, bias sigma : float or `~numpy.ndarray` RMS width of Gaussian energy dispersion, resolution pdf_threshold : float, optional Zero suppression threshold """ from .energy_dispersion import EnergyDispersion2D migra = np.linspace(1.0 / 3, 3, 200) # A dummy offset axis (need length 2 for interpolation to work) offset = Quantity([0, 1, 2], "deg") edisp = EnergyDispersion2D.from_gauss( e_true=e_true, migra=migra, sigma=sigma, bias=bias, offset=offset, pdf_threshold=pdf_threshold, ) return edisp.to_energy_dispersion(offset=offset[0], e_reco=e_reco)
[docs] @classmethod def from_diagonal_response(cls, e_true, e_reco=None): """Create energy dispersion from a diagonal response, i.e. perfect energy resolution This creates the matrix corresponding to a perfect energy response. It contains ones where the e_true center is inside the e_reco bin. It is a square diagonal matrix if e_true = e_reco. This is useful in cases where code always applies an edisp, but you don't want it to do anything. Parameters ---------- e_true, e_reco : `~astropy.units.Quantity` Energy bounds for true and reconstructed energy axis Examples -------- If ``e_true`` equals ``e_reco``, you get a diagonal matrix:: e_true = [0.5, 1, 2, 4, 6] * u.TeV edisp = EnergyDispersion.from_diagonal_response(e_true) edisp.plot_matrix() Example with different energy binnings:: e_true = [0.5, 1, 2, 4, 6] * u.TeV e_reco = [2, 4, 6] * u.TeV edisp = EnergyDispersion.from_diagonal_response(e_true, e_reco) edisp.plot_matrix() """ if e_reco is None: e_reco = e_true e_true_center = 0.5 * (e_true[1:] + e_true[:-1]) etrue_2d, ereco_lo_2d = np.meshgrid(e_true_center, e_reco[:-1]) etrue_2d, ereco_hi_2d = np.meshgrid(e_true_center, e_reco[1:]) data = np.logical_and(etrue_2d >= ereco_lo_2d, etrue_2d < ereco_hi_2d) data = np.transpose(data).astype("float") return cls( e_true_lo=e_true[:-1], e_true_hi=e_true[1:], e_reco_lo=e_reco[:-1], e_reco_hi=e_reco[1:], data=data, )
[docs] @classmethod def from_hdulist(cls, hdulist, hdu1="MATRIX", hdu2="EBOUNDS"): """Create `EnergyDispersion` object from `~astropy.io.fits.HDUList`. Parameters ---------- hdulist : `~astropy.io.fits.HDUList` HDU list with ``MATRIX`` and ``EBOUNDS`` extensions. hdu1 : str, optional HDU containing the energy dispersion matrix, default: MATRIX hdu2 : str, optional HDU containing the energy axis information, default, EBOUNDS """ matrix_hdu = hdulist[hdu1] ebounds_hdu = hdulist[hdu2] data = matrix_hdu.data header = matrix_hdu.header pdf_matrix = np.zeros([len(data), header["DETCHANS"]], dtype=np.float64) for i, l in enumerate(data): if l.field("N_GRP"): m_start = 0 for k in range(l.field("N_GRP")): pdf_matrix[ i, l.field("F_CHAN")[k] : l.field("F_CHAN")[k] + l.field("N_CHAN")[k], ] = l.field("MATRIX")[m_start : m_start + l.field("N_CHAN")[k]] m_start += l.field("N_CHAN")[k] unit = ebounds_hdu.header.get("TUNIT2") e_reco_lo = Quantity(ebounds_hdu.data["E_MIN"], unit=unit) e_reco_hi = Quantity(ebounds_hdu.data["E_MAX"], unit=unit) unit = matrix_hdu.header.get("TUNIT1") e_true_lo = Quantity(matrix_hdu.data["ENERG_LO"], unit=unit) e_true_hi = Quantity(matrix_hdu.data["ENERG_HI"], unit=unit) return cls( e_true_lo=e_true_lo, e_true_hi=e_true_hi, e_reco_lo=e_reco_lo, e_reco_hi=e_reco_hi, data=pdf_matrix, )
[docs] @classmethod def read(cls, filename, hdu1="MATRIX", hdu2="EBOUNDS"): """Read from file. Parameters ---------- filename : `pathlib.Path`, str File to read hdu1 : str, optional HDU containing the energy dispersion matrix, default: MATRIX hdu2 : str, optional HDU containing the energy axis information, default, EBOUNDS """ with fits.open(make_path(filename), memmap=False) as hdulist: return cls.from_hdulist(hdulist, hdu1=hdu1, hdu2=hdu2)
[docs] def to_hdulist(self, use_sherpa=False, **kwargs): """Convert RMF to FITS HDU list format. Parameters ---------- header : `~astropy.io.fits.Header` Header to be written in the fits file. energy_unit : str Unit in which the energy is written in the HDU list Returns ------- hdulist : `~astropy.io.fits.HDUList` RMF in HDU list format. Notes ----- For more info on the RMF FITS file format see: https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/summary/cal_gen_92_002_summary.html """ # Cannot use table_to_fits here due to variable length array # http://docs.astropy.org/en/v1.0.4/io/fits/usage/unfamiliar.html table = self.to_table() name = table.meta.pop("name") header = fits.Header() header.update(table.meta) if use_sherpa: table["ENERG_HI"] = table["ENERG_HI"].quantity.to("keV") table["ENERG_LO"] = table["ENERG_LO"].quantity.to("keV") cols = table.columns c0 = fits.Column( name=cols[0].name, format="E", array=cols[0], unit=str(cols[0].unit) ) c1 = fits.Column( name=cols[1].name, format="E", array=cols[1], unit=str(cols[1].unit) ) c2 = fits.Column(name=cols[2].name, format="I", array=cols[2]) c3 = fits.Column(name=cols[3].name, format="PI()", array=cols[3]) c4 = fits.Column(name=cols[4].name, format="PI()", array=cols[4]) c5 = fits.Column(name=cols[5].name, format="PE()", array=cols[5]) hdu = fits.BinTableHDU.from_columns( [c0, c1, c2, c3, c4, c5], header=header, name=name ) energy = self.e_reco.edges if use_sherpa: energy = energy.to("keV") ebounds = energy_axis_to_ebounds(energy) prim_hdu = fits.PrimaryHDU() return fits.HDUList([prim_hdu, hdu, ebounds])
[docs] def to_table(self): """Convert to `~astropy.table.Table`. The output table is in the OGIP RMF format. https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html#Tab:1 """ rows = self.pdf_matrix.shape[0] n_grp = [] f_chan = np.ndarray(dtype=np.object, shape=rows) n_chan = np.ndarray(dtype=np.object, shape=rows) matrix = np.ndarray(dtype=np.object, shape=rows) # Make RMF type matrix for i, row in enumerate(self.data.data.value): pos = np.nonzero(row)[0] borders = np.where(np.diff(pos) != 1)[0] # add 1 to borders for correct behaviour of np.split groups = np.asarray(np.split(pos, borders + 1)) n_grp_temp = groups.shape[0] if groups.size > 0 else 1 n_chan_temp = np.asarray([val.size for val in groups]) try: f_chan_temp = np.asarray([val[0] for val in groups]) except IndexError: f_chan_temp = np.zeros(1) n_grp.append(n_grp_temp) f_chan[i] = f_chan_temp n_chan[i] = n_chan_temp matrix[i] = row[pos] n_grp = np.asarray(n_grp, dtype=np.int16) # Get total number of groups and channel subsets numgrp, numelt = 0, 0 for val, val2 in zip(n_grp, n_chan): numgrp += np.sum(val) numelt += np.sum(val2) table = Table() energy = self.e_true.edges table["ENERG_LO"] = energy[:-1] table["ENERG_HI"] = energy[1:] table["N_GRP"] = n_grp table["F_CHAN"] = f_chan table["N_CHAN"] = n_chan table["MATRIX"] = matrix table.meta = { "name": "MATRIX", "chantype": "PHA", "hduclass": "OGIP", "hduclas1": "RESPONSE", "hduclas2": "RSP_MATRIX", "detchans": self.e_reco.nbin, "numgrp": numgrp, "numelt": numelt, "tlmin4": 0, } return table
[docs] def write(self, filename, use_sherpa=False, **kwargs): """Write to file.""" filename = make_path(filename) self.to_hdulist(use_sherpa=use_sherpa).writeto(filename, **kwargs)
[docs] def get_resolution(self, e_true): """Get energy resolution for a given true energy. The resolution is given as a percentage of the true energy Parameters ---------- e_true : `~astropy.units.Quantity` True energy """ var = self._get_variance(e_true) idx_true = self.e_true.coord_to_idx(e_true) e_true_real = self.e_true.center[idx_true] return np.sqrt(var) / e_true_real
[docs] def get_bias(self, e_true): r"""Get reconstruction bias for a given true energy. Bias is defined as .. math:: \frac{E_{reco}-E_{true}}{E_{true}} Parameters ---------- e_true : `~astropy.units.Quantity` True energy """ e_reco = self.get_mean(e_true) idx_true = self.e_true.coord_to_idx(e_true) e_true_real = self.e_true.center[idx_true] bias = (e_reco - e_true_real) / e_true_real return bias
[docs] def get_bias_energy(self, bias, emin=None, emax=None): """Find energy corresponding to a given bias. In case the solution is not unique, provide the ``emin`` or ``emax`` arguments to limit the solution to the given range. By default the peak energy of the bias is chosen as ``emin``. Parameters ---------- bias : float Bias value. emin : `~astropy.units.Quantity` Lower bracket value in case solution is not unique. emax : `~astropy.units.Quantity` Upper bracket value in case solution is not unique. Returns ------- bias_energy : `~astropy.units.Quantity` Reconstructed energy corresponding to the given bias. """ from gammapy.modeling.models import TemplateSpectralModel e_true = self.e_true.center values = self.get_bias(e_true) if emin is None: # use the peak bias energy as default minimum emin = e_true[np.nanargmax(values)] if emax is None: emax = e_true[-1] bias_spectrum = TemplateSpectralModel(e_true, values) e_true_bias = bias_spectrum.inverse(Quantity(bias), emin=emin, emax=emax) # return reconstructed energy return e_true_bias * (1 + bias)
[docs] def get_mean(self, e_true): """Get mean reconstructed energy for a given true energy.""" # find pdf for true energies idx = self.e_true.coord_to_idx(e_true) pdf = self.data.data[idx] # compute sum along reconstructed energy # axis to determine the mean norm = np.sum(pdf, axis=-1) temp = np.sum(pdf * self.e_reco.center, axis=-1) with np.errstate(invalid="ignore"): # corm can be zero mean = np.nan_to_num(temp / norm) return mean
def _get_variance(self, e_true): """Get variance of log reconstructed energy.""" # evaluate the pdf at given true energies idx = self.e_true.coord_to_idx(e_true) pdf = self.data.data[idx] # compute mean mean = self.get_mean(e_true) # create array of reconstructed-energy nodes # for each given true energy value # (first axis is reconstructed energy) erec = self.e_reco.center erec = np.repeat(erec, max(np.sum(mean.shape), 1)).reshape( erec.shape + mean.shape ) # compute deviation from mean # (and move reconstructed energy axis to last axis) temp_ = (erec - mean) ** 2 temp = np.rollaxis(temp_, 1) # compute sum along reconstructed energy # axis to determine the variance norm = np.sum(pdf, axis=-1) var = np.sum(temp * pdf, axis=-1) return var / norm
[docs] def plot_matrix(self, ax=None, show_energy=None, add_cbar=False, **kwargs): """Plot PDF matrix. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Axis show_energy : `~astropy.units.Quantity`, optional Show energy, e.g. threshold, as vertical line add_cbar : bool Add a colorbar to the plot. Returns ------- ax : `~matplotlib.axes.Axes` Axis """ import matplotlib.pyplot as plt from matplotlib.colors import PowerNorm kwargs.setdefault("cmap", "GnBu") norm = PowerNorm(gamma=0.5) kwargs.setdefault("norm", norm) ax = plt.gca() if ax is None else ax e_true = self.e_true.edges e_reco = self.e_reco.edges x = e_true.value y = e_reco.value z = self.pdf_matrix caxes = ax.pcolormesh(x, y, z.T, **kwargs) if show_energy is not None: ener_val = show_energy.to_value(self.reco_energy.unit) ax.hlines(ener_val, 0, 200200, linestyles="dashed") if add_cbar: label = "Probability density (A.U.)" cbar = ax.figure.colorbar(caxes, ax=ax, label=label) ax.set_xlabel(fr"$E_\mathrm{{True}}$ [{e_true.unit}]") ax.set_ylabel(fr"$E_\mathrm{{Reco}}$ [{e_reco.unit}]") ax.set_xscale("log") ax.set_yscale("log") ax.set_xlim(x.min(), x.max()) ax.set_ylim(y.min(), y.max()) return ax
[docs] def plot_bias(self, ax=None, **kwargs): """Plot reconstruction bias. See `~gammapy.irf.EnergyDispersion.get_bias` method. Parameters ---------- ax : `~matplotlib.axes.Axes`, optional Axis """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax x = self.e_true.center.to_value("TeV") y = self.get_bias(self.e_true.center) ax.plot(x, y, **kwargs) ax.set_xlabel(r"$E_\mathrm{{True}}$ [TeV]") ax.set_ylabel(r"($E_\mathrm{{True}} - E_\mathrm{{Reco}} / E_\mathrm{{True}}$)") ax.set_xscale("log") return ax
[docs] def peek(self, figsize=(15, 5)): """Quick-look summary plot.""" import matplotlib.pyplot as plt fig, axes = plt.subplots(nrows=1, ncols=2, figsize=figsize) self.plot_bias(ax=axes[0]) self.plot_matrix(ax=axes[1]) plt.tight_layout()