Source code for gammapy.spectrum.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import copy
import logging
import numpy as np
import astropy.units as u
from astropy.io import fits
from astropy.table import Table
from gammapy.maps import MapAxis
from gammapy.maps.utils import edges_from_lo_hi
from gammapy.utils.fits import ebounds_to_energy_axis, energy_axis_to_ebounds
from gammapy.utils.regions import compound_region_to_list
from gammapy.utils.scripts import make_path

__all__ = ["CountsSpectrum"]

log = logging.getLogger(__name__)


[docs]class CountsSpectrum: """Generic counts spectrum. Parameters ---------- energy_lo : `~astropy.units.Quantity` Lower bin edges of energy axis energy_hi : `~astropy.units.Quantity` Upper bin edges of energy axis data : `~numpy.ndarray` Spectrum data. unit : str or `~astropy.units.Unit` Data unit region : `~regions.SkyRegion` Region the spectrum is defined for. Examples -------- .. plot:: :include-source: from gammapy.spectrum import CountsSpectrum import numpy as np import astropy.units as u ebounds = np.logspace(0,1,11) * u.TeV data = np.arange(10) spec = CountsSpectrum( energy_lo=ebounds[:-1], energy_hi=ebounds[1:], data=data, ) spec.plot(show_poisson_errors=True) """ def __init__(self, energy_lo, energy_hi, data=None, unit="", region=None): e_edges = edges_from_lo_hi(energy_lo, energy_hi) self.energy = MapAxis.from_edges(e_edges, interp="log", name="energy") if data is None: data = np.zeros(self.energy.nbin) self.data = np.array(data) if not self.energy.nbin == self.data.size: raise ValueError("Incompatible data and energy axis size.") self.unit = u.Unit(unit) self.region = region @property def quantity(self): return self.data * self.unit @quantity.setter def quantity(self, quantity): self.data = quantity.value self.unit = quantity.unit
[docs] @classmethod def from_hdulist(cls, hdulist, hdu1="COUNTS", hdu2="EBOUNDS"): """Read from HDU list in OGIP format.""" table = Table.read(hdulist[hdu1]) counts = table["COUNTS"].data ebounds = ebounds_to_energy_axis(hdulist[hdu2]) # TODO: add region serilisation return cls(data=counts, energy_lo=ebounds[:-1], energy_hi=ebounds[1:])
[docs] @classmethod def read(cls, filename, hdu1="COUNTS", hdu2="EBOUNDS"): """Read from file.""" with fits.open(make_path(filename), memmap=False) as hdulist: return cls.from_hdulist(hdulist, hdu1=hdu1, hdu2=hdu2)
[docs] def to_table(self): """Convert to `~astropy.table.Table`. Data format specification: :ref:`gadf:ogip-pha` """ channel = np.arange(self.energy.nbin, dtype=np.int16) counts = np.array(self.data, dtype=np.int32) names = ["CHANNEL", "COUNTS"] meta = {"name": "COUNTS"} return Table([channel, counts], names=names, meta=meta)
[docs] def to_hdulist(self, use_sherpa=False): """Convert to `~astropy.io.fits.HDUList`. This adds an ``EBOUNDS`` extension to the ``BinTableHDU`` produced by ``to_table``, in order to store the energy axis """ table = self.to_table() name = table.meta["name"] hdu = fits.BinTableHDU(table, name=name) energy = self.energy.edges if use_sherpa: energy = energy.to("keV") ebounds = energy_axis_to_ebounds(energy) return fits.HDUList([fits.PrimaryHDU(), hdu, ebounds])
[docs] def write(self, filename, use_sherpa=False, **kwargs): """Write to file.""" filename = make_path(filename) self.to_hdulist(use_sherpa=use_sherpa).writeto(filename, **kwargs)
[docs] def fill_events(self, events): """Fill events (`gammapy.data.EventList`).""" self.fill_energy(events.energy)
[docs] def fill_energy(self, energy): """Fill energy values (`~astropy.units.Quantity`)""" energy = energy.to_value(self.energy.unit) edges = self.energy.edges.to_value(self.energy.unit) self.data = np.histogram(energy, edges)[0]
@property def total_counts(self): """Total number of counts.""" return self.data.sum()
[docs] def plot( self, ax=None, energy_unit="TeV", show_poisson_errors=False, show_energy=None, **kwargs, ): """Plot as data points. kwargs are forwarded to `~matplotlib.pyplot.errorbar` Parameters ---------- ax : `~matplotlib.axis` (optional) Axis instance to be used for the plot energy_unit : str, `~astropy.units.Unit`, optional Unit of the energy axis show_poisson_errors : bool, optional Show poisson errors on the plot show_energy : `~astropy.units.Quantity`, optional Show energy, e.g. threshold, as vertical line Returns ------- ax: `~matplotlib.axis` Axis instance used for the plot """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax counts = self.data x = self.energy.center.to_value(energy_unit) bounds = self.energy.edges.to_value(energy_unit) xerr = [x - bounds[:-1], bounds[1:] - x] yerr = np.sqrt(counts) if show_poisson_errors else 0 kwargs.setdefault("fmt", ".") ax.errorbar(x, counts, xerr=xerr, yerr=yerr, **kwargs) if show_energy is not None: ener_val = u.Quantity(show_energy).to_value(energy_unit) ax.vlines(ener_val, 0, 1.1 * max(self.data), linestyles="dashed") ax.set_xlabel(f"Energy [{energy_unit}]") ax.set_ylabel("Counts") ax.set_xscale("log") return ax
[docs] def plot_hist(self, ax=None, energy_unit="TeV", show_energy=None, **kwargs): """Plot as histogram. kwargs are forwarded to `~matplotlib.pyplot.hist` Parameters ---------- ax : `~matplotlib.axis` (optional) Axis instance to be used for the plot energy_unit : str, `~astropy.units.Unit`, optional Unit of the energy axis show_energy : `~astropy.units.Quantity`, optional Show energy, e.g. threshold, as vertical line """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax kwargs.setdefault("lw", 2) kwargs.setdefault("histtype", "step") weights = self.data bins = self.energy.edges.to_value(energy_unit) x = self.energy.center.to_value(energy_unit) ax.hist(x, bins=bins, weights=weights, **kwargs) if show_energy is not None: ener_val = u.Quantity(show_energy).to_value(energy_unit) ax.vlines(ener_val, 0, 1.1 * max(self.data), linestyles="dashed") ax.set_xlabel(f"Energy [{energy_unit}]") ax.set_ylabel("Counts") ax.set_xscale("log") return ax
[docs] def plot_region(self, ax=None, **kwargs): """Plot region Parameters ---------- ax : `~astropy.vizualisation.WCSAxes` Axes to plot on. **kwargs : dict Keyword arguments forwarded to `~regions.PixelRegion.as_artist` """ import matplotlib.pyplot as plt from matplotlib.collections import PatchCollection ax = plt.gca() or ax regions = compound_region_to_list(self.region) artists = [region.to_pixel(wcs=ax.wcs).as_artist() for region in regions] patches = PatchCollection(artists, **kwargs) ax.add_collection(patches) return ax
[docs] def peek(self, figsize=(5, 10)): """Quick-look summary plots.""" import matplotlib.pyplot as plt fig, ax = plt.subplots(1, 1, figsize=figsize) self.plot_hist(ax=ax) return ax
[docs] def copy(self): """A deep copy of self.""" return copy.deepcopy(self)
[docs] def downsample(self, factor): """Downsample spectrum. Parameters ---------- factor : int Downsampling factor. Returns ------- spectrum : `~gammapy.spectrum.CountsSpectrum` Downsampled spectrum. """ from gammapy.extern.skimage import block_reduce data = block_reduce(self.data, block_size=(factor,)) energy = self.energy.downsample(factor).edges return self.__class__(energy_lo=energy[:-1], energy_hi=energy[1:], data=data)
[docs] def energy_mask(self, emin=None, emax=None): """Create a mask for a given energy range. Parameters ---------- emin, emax : `~astropy.units.Quantity` Energy range """ edges = self.energy.edges # set default values emin = emin if emin is not None else edges[0] emax = emax if emax is not None else edges[-1] return (edges[:-1] >= emin) & (edges[1:] <= emax)
def _arithmetics(self, operator, other, copy): """Perform arithmetics on maps after checking geometry consistency.""" if isinstance(other, CountsSpectrum): q = other.quantity else: q = u.Quantity(other, copy=False) out = self.copy() if copy else self out.quantity = operator(out.quantity, q) return out def __add__(self, other): return self._arithmetics(np.add, other, copy=True) def __iadd__(self, other): return self._arithmetics(np.add, other, copy=False) def __sub__(self, other): return self._arithmetics(np.subtract, other, copy=True) def __isub__(self, other): return self._arithmetics(np.subtract, other, copy=False) def __mul__(self, other): return self._arithmetics(np.multiply, other, copy=True) def __imul__(self, other): return self._arithmetics(np.multiply, other, copy=False) def __truediv__(self, other): return self._arithmetics(np.true_divide, other, copy=True) def __itruediv__(self, other): return self._arithmetics(np.true_divide, other, copy=False) def __array__(self): return self.data
class SpectrumEvaluator: """Calculate number of predicted counts (``npred``). The true and reconstructed energy binning are inferred from the provided IRFs. Parameters ---------- model : `~gammapy.modeling.models.SkyModel` Spectral model aeff : `~gammapy.irf.EffectiveAreaTable` EffectiveArea edisp : `~gammapy.irf.EDispKernel`, optional Energy dispersion kernel. livetime : `~astropy.units.Quantity` Observation duration (may be contained in aeff) e_true : `~astropy.units.Quantity`, optional Desired energy axis of the prediced counts vector if no IRFs are given Examples -------- Calculate predicted counts in a desired reconstruced energy binning .. plot:: :include-source: import numpy as np import astropy.units as u import matplotlib.pyplot as plt from gammapy.irf import EffectiveAreaTable, EDispKernel from gammapy.modeling.models import PowerLawSpectralModel, SkyModel from gammapy.spectrum import SpectrumEvaluator e_true = np.logspace(-2, 2.5, 109) * u.TeV e_reco = np.logspace(-2, 2, 73) * u.TeV aeff = EffectiveAreaTable.from_parametrization(energy=e_true) edisp = EDispKernel.from_gauss(e_true=e_true, e_reco=e_reco, sigma=0.3, bias=0) pwl = PowerLawSpectralModel(index=2.3, amplitude="2.5e-12 cm-2 s-1 TeV-1", reference="1 TeV") model = SkyModel(spectral_model=pwl) predictor = SpectrumEvaluator(model=model, aeff=aeff, edisp=edisp, livetime="1 hour") predictor.compute_npred().plot_hist() plt.show() """ def __init__(self, model, aeff=None, edisp=None, livetime=None): self.model = model self.aeff = aeff self.edisp = edisp self.livetime = livetime def compute_npred(self): e_true = self.aeff.energy.edges integral_flux = self.model.spectral_model.integral( emin=e_true[:-1], emax=e_true[1:], intervals=True ) true_counts = self.apply_aeff(integral_flux) return self.apply_edisp(true_counts) def apply_aeff(self, integral_flux): if self.aeff is not None: cts = integral_flux * self.aeff.data.data else: cts = integral_flux # Multiply with livetime if not already contained in aeff or model if cts.unit.is_equivalent("s-1"): cts *= self.livetime return cts.to("") def apply_edisp(self, true_counts): from . import CountsSpectrum if self.edisp is not None: cts = self.edisp.apply(true_counts) e_reco = self.edisp.e_reco.edges else: cts = true_counts e_reco = self.aeff.energy.edges return CountsSpectrum(data=cts, energy_lo=e_reco[:-1], energy_hi=e_reco[1:])