Source code for gammapy.spectrum.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
from copy import deepcopy
from collections import OrderedDict
import numpy as np
from astropy.table import Table
from astropy.io import fits
import astropy.units as u
from .. import version
from ..utils.nddata import NDDataArray, BinnedDataAxis
from ..utils.scripts import make_path
from ..utils.fits import (
    energy_axis_to_ebounds,
    fits_table_to_table,
    ebounds_to_energy_axis,
    table_to_fits_table,
)
from ..data import EventList

__all__ = [
    'CountsSpectrum',
    'PHACountsSpectrum',
    'PHACountsSpectrumList',
]


[docs]class CountsSpectrum(object): """Generic counts spectrum. Parameters ---------- energy_lo : `~astropy.units.Quantity` Lower bin edges of energy axis energy_hi : `~astropy.units.Quantity` Upper bin edges of energy axis data : `~astropy.units.Quantity`, array-like Counts Examples -------- .. plot:: :include-source: from gammapy.spectrum import CountsSpectrum import numpy as np import astropy.units as u ebounds = np.logspace(0,1,11) * u.TeV data = np.arange(10) * u.ct spec = CountsSpectrum( energy_lo=ebounds[:-1], energy_hi=ebounds[1:], data=data, ) spec.plot(show_poisson_errors=True) """ default_interp_kwargs = dict(bounds_error=False, method='nearest') """Default interpolation kwargs""" def __init__(self, energy_lo, energy_hi, data=None, interp_kwargs=None): axes = [BinnedDataAxis(energy_lo, energy_hi, interpolation_mode='log', name='energy')] # Set data unit to counts for coherence if data is not None: if isinstance(data, u.Quantity): if data.unit.is_equivalent('ct'): pass elif data.unit.is_equivalent(u.Unit('')): data = data.value else: raise ValueError('Invalid data unit {}'.format(data.unit)) data = u.Quantity(data, 'ct') if interp_kwargs is None: interp_kwargs = self.default_interp_kwargs self.data = NDDataArray(axes=axes, data=data, interp_kwargs=interp_kwargs) @property def energy(self): return self.data.axis('energy') @classmethod
[docs] def from_hdulist(cls, hdulist, hdu1='COUNTS', hdu2='EBOUNDS'): """Read OGIP format hdulist""" counts_table = fits_table_to_table(hdulist[hdu1]) counts = counts_table['COUNTS'].data ebounds = ebounds_to_energy_axis(hdulist[hdu2]) return cls(data=counts, energy_lo=ebounds.lower_bounds, energy_hi=ebounds.upper_bounds)
@classmethod
[docs] def read(cls, filename, hdu1='COUNTS', hdu2='EBOUNDS', **kwargs): filename = make_path(filename) hdulist = fits.open(str(filename), **kwargs) try: return cls.from_hdulist(hdulist, hdu1=hdu1, hdu2=hdu2) except KeyError: msg = 'File {} does not contain HDUs "{}"'.format( filename, [hdu1, hdu2]) msg += '\n Available {}'.format([_.name for _ in hdulist]) raise ValueError(msg)
[docs] def to_table(self): """Convert to `~astropy.table.Table`. Data format specification: :ref:`gadf:ogip-pha` """ channel = np.arange(self.energy.nbins, dtype=np.int16) counts = np.array(self.data.data.value, dtype=np.int32) names = ['CHANNEL', 'COUNTS'] meta = OrderedDict() meta['name'] = 'COUNTS' return Table([channel, counts], names=names, meta=meta)
[docs] def to_hdulist(self): """Convert to `~astropy.io.fits.HDUList` This adds an ``EBOUNDS`` extension to the ``BinTableHDU`` produced by ``to_table``, in order to store the energy axis """ hdu = table_to_fits_table(self.to_table()) prim_hdu = fits.PrimaryHDU() ebounds = energy_axis_to_ebounds(self.energy.bins) return fits.HDUList([prim_hdu, hdu, ebounds])
[docs] def write(self, filename, **kwargs): filename = make_path(filename) self.to_hdulist().writeto(str(filename), **kwargs)
[docs] def fill(self, events): """Fill with list of events TODO: Move to `~gammapy.utils.nddata.NDDataArray` Parameters ---------- events: `~astropy.units.Quantity`, `gammapy.data.EventList`, List of event energies """ if isinstance(events, EventList): events = events.energy energy = events.to(self.energy.unit) binned_val = np.histogram(energy.value, self.energy.bins)[0] self.data.data = binned_val * u.ct
@property def total_counts(self): """Total number of counts """ return self.data.data.sum()
[docs] def plot(self, ax=None, energy_unit='TeV', show_poisson_errors=False, show_energy=None, **kwargs): """Plot as datapoint kwargs are forwarded to `~matplotlib.pyplot.errorbar` Parameters ---------- ax : `~matplotlib.axis` (optional) Axis instance to be used for the plot energy_unit : str, `~astropy.units.Unit`, optional Unit of the energy axis show_poisson_errors : bool, optional Show poisson errors on the plot show_energy : `~astropy.units.Quantity`, optional Show energy, e.g. threshold, as vertical line Returns ------- ax: `~matplotlib.axis` Axis instance used for the plot """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax counts = self.data.data.value x = self.energy.nodes.to(energy_unit).value bounds = self.energy.bins.to(energy_unit).value xerr = [x - bounds[:-1], bounds[1:] - x] yerr = np.sqrt(counts) if show_poisson_errors else 0 kwargs.setdefault('fmt', '') ax.errorbar(x, counts, xerr=xerr, yerr=yerr, **kwargs) if show_energy is not None: ener_val = u.Quantity(show_energy).to(energy_unit).value ax.vlines(ener_val, 0, 1.1 * max(self.data.data.value), linestyles='dashed') ax.set_xlabel('Energy [{0}]'.format(energy_unit)) ax.set_ylabel('Counts') ax.set_xscale('log') ax.set_ylim(0, 1.2 * max(self.data.data.value)) return ax
[docs] def plot_hist(self, ax=None, energy_unit='TeV', show_energy=None, **kwargs): """Plot as histogram kwargs are forwarded to `~matplotlib.pyplot.hist` Parameters ---------- ax : `~matplotlib.axis` (optional) Axis instance to be used for the plot energy_unit : str, `~astropy.units.Unit`, optional Unit of the energy axis show_energy : `~astropy.units.Quantity`, optional Show energy, e.g. threshold, as vertical line """ import matplotlib.pyplot as plt ax = plt.gca() if ax is None else ax kwargs.setdefault('lw', 2) kwargs.setdefault('histtype', 'step') weights = self.data.data.value bins = self.energy.bins.to(energy_unit).value[:-1] x = self.energy.nodes.to(energy_unit).value ax.hist(x, bins=bins, weights=weights, **kwargs) if show_energy is not None: ener_val = u.Quantity(show_energy).to(energy_unit).value ax.vlines(ener_val, 0, 1.1 * max(self.data.data.value), linestyles='dashed') ax.set_xlabel('Energy [{0}]'.format(energy_unit)) ax.set_ylabel('Counts') ax.set_xscale('log') return ax
[docs] def peek(self, figsize=(5, 10)): """Quick-look summary plots.""" import matplotlib.pyplot as plt fig, ax = plt.subplots(1, 1, figsize=figsize) self.plot_hist(ax=ax) return ax
[docs] def copy(self): """A deep copy of self. """ return deepcopy(self)
[docs] def rebin(self, parameter): """Rebin Parameters ---------- parameter, int Number of bins to merge Returns ------- rebinned_spectrum : `~gammapy.spectrum.CountsSpectrum` Rebinned spectrum """ if len(self.data.data) % parameter != 0: raise ValueError("Invalid rebin parameter: {}, nbins: {}".format( parameter, len(self.data.data))) # Copy to keep attributes retval = self.copy() energy = retval.energy energy.lo = energy.lo[0::parameter] energy.hi = energy.hi[parameter - 1::parameter] split_indices = np.arange(parameter, len(retval.data.data), parameter) counts_grp = np.split(retval.data.data, split_indices) counts_rebinned = np.sum(counts_grp, axis=1) retval.data.data = counts_rebinned * u.ct return retval
[docs]class PHACountsSpectrum(CountsSpectrum): """OGIP PHA equivalent The ``bkg`` flag controls wheater the PHA counts spectrum represents a background estimate or not (this slightly affectes the FITS header information when writing to disk). Parameters ---------- energy_lo : `~astropy.units.Quantity` Lower bin edges of energy axis energy_hi : `~astropy.units.Quantity` Upper bin edges of energy axis data : `~numpy.array`, list, optional Counts quality : int, array-like, optional Mask bins in safe energy range (1 = bad, 0 = good) backscal : float, array-like, optional Background scaling factor areascal : float, array-like, optional Area scaling factor is_bkg : bool, optional Background or soure spectrum, default: False meta : dict, optional Meta information, TODO: add link where possible meta info is listed """ def __init__(self, energy_lo, energy_hi, data=None, quality=None, backscal=None, areascal=None, is_bkg=False, meta=None): super(PHACountsSpectrum, self).__init__(energy_lo, energy_hi, data) if quality is None: quality = np.zeros(self.energy.nbins, dtype='i2') self.quality = quality if backscal is None: backscal = np.ones(self.energy.nbins) self.backscal = backscal if areascal is None: areascal = np.ones(self.energy.nbins) self.areascal = areascal self.is_bkg = is_bkg self.meta = meta or OrderedDict() self.meta.setdefault('CREATOR', 'Gammapy {}'.format(version.version)) self.meta.setdefault('OBS_ID', 0) @property def phafile(self): """PHA file associated with the observations""" return 'pha_obs{}.fits'.format(self.obs_id) @property def arffile(self): """ARF associated with the observations""" return self.phafile.replace('pha', 'arf') @property def rmffile(self): """RMF associated with the observations""" return self.phafile.replace('pha', 'rmf') @property def bkgfile(self): """Background PHA files associated with the observations""" return self.phafile.replace('pha', 'bkg') @property def obs_id(self): return self.meta['OBS_ID'] @obs_id.setter def obs_id(self, val): self.meta['OBS_ID'] = val @property def livetime(self): return self.meta['EXPOSURE'] * u.s @livetime.setter def livetime(self, val): self.meta['EXPOSURE'] = val.to('s').value @property def offset(self): return self.meta['OFFSET'] * u.deg @property def bins_in_safe_range(self): """Indices of bins within the energy thresholds""" idx = np.where(np.array(self.quality) == 0)[0] return idx @property def counts_in_safe_range(self): """Counts with bins outside safe range set to 0""" data = self.data.data.copy() data[np.nonzero(self.quality)] = 0 return data @property def lo_threshold(self): """Low energy threshold of the observation (lower bin edge)""" idx = self.bins_in_safe_range[0] return self.energy.lo[idx] @lo_threshold.setter def lo_threshold(self, thres): idx = np.where(self.energy.lo < thres)[0] self.quality[idx] = 1 @property def hi_threshold(self): """High energy threshold of the observation (upper bin edge)""" idx = self.bins_in_safe_range[-1] return self.energy.hi[idx] @hi_threshold.setter def hi_threshold(self, thres): idx = np.where(self.energy.lo > thres)[0] if len(idx) != 0: idx = np.insert(idx, 0, idx[0] - 1) self.quality[idx] = 1
[docs] def rebin(self, parameter): """Rebin see `~gammapy.spectrum.CountsSpectrum`, this function treats the quality vector correctly """ retval = super(PHACountsSpectrum, self).rebin(parameter) split_indices = np.arange(parameter, len(self.data.data), parameter) quality_grp = np.split(retval.quality, split_indices) quality_summed = np.sum(quality_grp, axis=1) # Exclude groups where not all bins are within the safe threshold condition = (quality_summed == parameter) quality_rebinned = np.where(condition, np.ones(len(retval.data.data)), np.zeros(len(retval.data.data))) retval.quality = np.array(quality_rebinned, dtype=int) # if backscal is not the same in all channels cannot merge if not np.isscalar(retval.backscal): if not np.isclose(np.diff(retval.backscal), 0).all(): raise ValueError('Cannot merge energy dependent backscal') else: retval.meta.backscal = retval.backscal[0] * np.ones(retval.energy.nbins) return retval
@property def _backscal_array(self): """Helper function to always return backscal as an array""" if np.isscalar(self.backscal): return np.ones(self.energy.nbins) * self.backscal else: return self.backscal
[docs] def to_table(self): """Write""" table = super(PHACountsSpectrum, self).to_table() table['QUALITY'] = self.quality table['BACKSCAL'] = self._backscal_array table['AREASCAL'] = self.areascal meta = OrderedDict() meta['name'] = 'SPECTRUM' meta['hduclass'] = 'OGIP' meta['hduclas1'] = 'SPECTRUM' meta['corrscal'] = '' meta['chantype'] = 'PHA' meta['detchans'] = self.energy.nbins meta['filter'] = 'None' meta['corrfile'] = '' meta['poisserr'] = True meta['hduclas3'] = 'COUNT' meta['hduclas4'] = 'TYPE:1' meta['lo_thres'] = self.lo_threshold.to("TeV").value meta['hi_thres'] = self.hi_threshold.to("TeV").value if not self.is_bkg: if self.rmffile is not None: meta['respfile'] = self.rmffile meta['backfile'] = self.bkgfile meta['ancrfile'] = self.arffile meta['hduclas2'] = 'TOTAL' else: meta['hduclas2'] = 'BKG' meta.update(self.meta) table.meta = meta return table
@classmethod
[docs] def from_hdulist(cls, hdulist, hdu1='SPECTRUM', hdu2='EBOUNDS'): """Read""" counts_table = fits_table_to_table(hdulist[hdu1]) ebounds = fits_table_to_table(hdulist[2]) emin = ebounds['E_MIN'].quantity emax = ebounds['E_MAX'].quantity kwargs = dict( data=counts_table['COUNTS'] * u.ct, backscal=counts_table['BACKSCAL'].data, energy_lo=emin, energy_hi=emax, quality=counts_table['QUALITY'].data, meta=counts_table.meta ) if hdulist[1].header['HDUCLAS2'] == 'BKG': kwargs['is_bkg'] = True return cls(**kwargs)
@classmethod
[docs] def read(cls, filename, hdu1='SPECTRUM', hdu2='EBOUNDS', **kwargs): filename = make_path(filename) hdulist = fits.open(str(filename), **kwargs) try: return cls.from_hdulist(hdulist, hdu1=hdu1, hdu2=hdu2) except KeyError: msg = 'File {} does not contain HDUs "{}"'.format( filename, [hdu1, hdu2]) msg += '\n Available {}'.format([_.name for _ in hdulist]) raise ValueError(msg)
[docs] def to_sherpa(self, name): """Return `~sherpa.astro.data.DataPHA` Parameters ---------- name : str Instance name """ from sherpa.utils import SherpaFloat from sherpa.astro.data import DataPHA table = self.to_table() # Workaround to avoid https://github.com/sherpa/sherpa/issues/248 if np.isscalar(self.backscal): backscal = self.backscal else: backscal = self.backscal.copy() if np.allclose(backscal.mean(), backscal): backscal = backscal[0] return DataPHA( name=name, channel=(table['CHANNEL'].data + 1).astype(SherpaFloat), counts=table['COUNTS'].data.astype(SherpaFloat), quality=table['QUALITY'].data, exposure=self.livetime.to('s').value, backscal=backscal, areascal=1., syserror=None, staterror=None, grouping=None, )
[docs]class PHACountsSpectrumList(list): """List of `~gammapy.spectrum.PHACountsSpectrum` All spectra must have the same energy binning. This represent the PHA type II data format. See https://heasarc.gsfc.nasa.gov/docs/heasarc/ofwg/docs/spectra/ogip_92_007/node8.html """
[docs] def write(self, outdir, **kwargs): """Write to file""" outdir = make_path(outdir) self.to_hdulist().writeto(str(outdir), **kwargs)
[docs] def to_hdulist(self): """Create `~astropy.fits.HDUList`""" prim_hdu = fits.PrimaryHDU() hdu = table_to_fits_table(self.to_table()) ebounds = energy_axis_to_ebounds(self[0].energy.bins) return fits.HDUList([prim_hdu, hdu, ebounds])
[docs] def to_table(self): """Create `~astropy.table.Table` """ is_bkg = self[0].is_bkg nbins = self[0].energy.nbins spec_num = np.empty([len(self), 1], dtype=np.int16) channel = np.empty([len(self), nbins], dtype=np.int16) counts = np.empty([len(self), nbins], dtype=np.int32) quality = np.empty([len(self), nbins], dtype=np.int32) backscal = np.empty([len(self), nbins], dtype=np.int32) backfile = list() for idx, pha in enumerate(self): t = pha.to_table() spec_num[idx] = pha.obs_id channel[idx] = t['CHANNEL'].data counts[idx] = t['COUNTS'].data quality[idx] = t['QUALITY'].data backscal[idx] = t['BACKSCAL'].data backfile.append('bkg.fits[{}]'.format(idx)) names = ['SPEC_NUM', 'CHANNEL', 'COUNTS', 'QUALITY', 'BACKSCAL'] meta = self[0].to_table().meta meta['hduclas4'] = 'TYPE:II' meta['ancrfile'] = 'arf.fits' meta['respfile'] = 'rmf.fits' meta.pop('OBS_ID') table = Table([spec_num, channel, counts, quality, backscal], names=names, meta=meta) if not is_bkg: table.meta.pop('backfile') table['BACKFILE'] = backfile return table
@classmethod
[docs] def read(cls, filename): """Read from file""" filename = make_path(filename) hdulist = fits.open(str(filename)) speclist = cls.from_hdulist(hdulist) return speclist
@classmethod
[docs] def from_hdulist(cls, hdulist): """Create from `~astropy.fits.HDUList`""" energy = ebounds_to_energy_axis(hdulist[2]) kwargs = dict( energy_lo=energy[:-1], energy_hi=energy[1:], ) if hdulist[1].header['HDUCLAS2'] == 'BKG': kwargs['is_bkg'] = True counts_table = fits_table_to_table(hdulist[1]) speclist = cls() for row in counts_table: kwargs['data'] = row['COUNTS'] * u.ct kwargs['backscal'] = row['BACKSCAL'] kwargs['quality'] = row['QUALITY'] spec = PHACountsSpectrum(meta=dict(hdulist[1].header), **kwargs) spec.obs_id = row['SPEC_NUM'] speclist.append(spec) return speclist