# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function, unicode_literals
import numpy as np
from astropy.units import Quantity
__all__ = [
'LogEnergyAxis',
'CountsPredictor',
'integrate_spectrum',
]
[docs]class LogEnergyAxis(object):
"""
Log energy axis.
Defines a transformation between:
* ``energy = 10 ** x``
* ``x = log10(energy)``
* ``pix`` in the range [0, ..., len(x)] via linear interpolation of the ``x`` array,
e.g. ``pix=0`` corresponds to ``x[0]``
and ``pix=0.3`` is ``0.5 * (0.3 * x[0] + 0.7 * x[1])``
.. note::
The `specutils.Spectrum1DLookupWCS <http://specutils.readthedocs.io/en/latest/api/specutils.wcs.specwcs.Spectrum1DLookupWCS.html>`__
class is similar (only that it doesn't include the ``log`` transformation and the API is different.
Also see this Astropy feature request: https://github.com/astropy/astropy/issues/2362
Parameters
----------
energy : `~astropy.units.Quantity`
Energy array
mode : ['center', 'edges']
Whether the energy array represents the values at the center or edges of
the pixels.
"""
def __init__(self, energy, mode='center'):
from scipy.interpolate import RegularGridInterpolator
if mode == 'center':
z = np.arange(len(energy))
elif mode == 'edges':
z = np.arange(len(energy)) - 0.5
else:
raise ValueError('Not a valid mode.')
self.mode = mode
self._eunit = energy.unit
log_e = np.log(energy.value)
kwargs = dict(bounds_error=False, fill_value=None, method='linear')
self._z_to_log_e = RegularGridInterpolator((z,), log_e, **kwargs)
self._log_e_to_z = RegularGridInterpolator((log_e,), z, **kwargs)
[docs] def wcs_world2pix(self, energy):
"""
Convert energy to pixel coordinates.
Parameters
----------
energy : `~astropy.units.Quantity`
Energy coordinate.
"""
log_e = np.log(energy.to(self._eunit).value)
log_e = np.atleast_1d(log_e)
return self._log_e_to_z(log_e)
[docs] def wcs_pix2world(self, z):
"""
Convert pixel to energy coordinates.
Parameters
----------
z : float
Pixel coordinate
"""
z = np.atleast_1d(z)
log_e = self._z_to_log_e(z)
return np.exp(log_e) * self._eunit
[docs]class CountsPredictor(object):
"""Calculate number of predicted counts (``npred``).
The true and reconstructed energy binning are inferred from the provided IRFs.
Parameters
----------
model : `~gammapy.spectrum.models.SpectralModel`
Spectral model
aeff : `~gammapy.irf.EffectiveAreaTable`
EffectiveArea
edisp : `~gammapy.irf.EnergyDispersion`, optional
EnergyDispersion
livetime : `~astropy.units.Quantity`
Observation duration (may be contained in aeff)
e_true : `~astropy.units.Quantity`, optional
Desired energy axis of the prediced counts vector if no IRFs are given
Examples
--------
Calculate prediced counts in a desired reconstruced energy binning
.. plot::
:include-source:
from gammapy.irf import EnergyDispersion, EffectiveAreaTable
from gammapy.spectrum import models, CountsPredictor
import numpy as np
import astropy.units as u
import matplotlib.pyplot as plt
e_true = np.logspace(-2,2.5,109) * u.TeV
e_reco = np.logspace(-2,2,73) * u.TeV
aeff = EffectiveAreaTable.from_parametrization(energy=e_true)
edisp = EnergyDispersion.from_gauss(e_true=e_true, e_reco=e_reco,
sigma=0.3, bias=0)
model = models.PowerLaw(index=2.3,
amplitude=2.5 * 1e-12 * u.Unit('cm-2 s-1 TeV-1'),
reference=1*u.TeV)
livetime = 1 * u.h
predictor = CountsPredictor(model=model,
aeff=aeff,
edisp=edisp,
livetime=livetime)
predictor.run()
predictor.npred.plot_hist()
plt.show()
"""
def __init__(self, model, aeff=None, edisp=None, livetime=None, e_true=None):
self.model = model
self.aeff = aeff
self.edisp = edisp
self.livetime = livetime
self.e_true = e_true
self.e_reco = None
self.true_flux = None
self.true_counts = None
self.npred = None
[docs] def run(self):
self.integrate_model()
self.apply_aeff()
self.apply_edisp()
[docs] def integrate_model(self):
"""Integrate model in true energy space"""
if self.aeff is not None:
# TODO: True energy is converted to model amplitude unit. See issue 869
ref_unit = None
try:
for unit in self.model.parameters['amplitude'].quantity.unit.bases:
if unit.is_equivalent('eV'):
ref_unit = unit
except IndexError:
ref_unit = 'TeV'
self.e_true = self.aeff.energy.bins.to(ref_unit)
else:
if self.e_true is None:
raise ValueError("No true energy binning given")
self.true_flux = self.model.integral(emin=self.e_true[:-1],
emax=self.e_true[1:],
intervals=True)
[docs] def apply_aeff(self):
if self.aeff is not None:
cts = self.true_flux * self.aeff.data.data
if cts.unit.is_equivalent('s-1'):
cts *= self.livetime
else:
cts = self.true_flux
self.true_counts = cts.to('')
[docs] def apply_edisp(self):
from . import CountsSpectrum
if self.edisp is not None:
cts = self.edisp.apply(self.true_counts)
self.e_reco = self.edisp.e_reco.bins
else:
cts = self.true_counts
self.e_reco = self.e_true
self.npred = CountsSpectrum(data=cts,
energy_lo=self.e_reco[:-1],
energy_hi=self.e_reco[1:])
[docs]def integrate_spectrum(func, xmin, xmax, ndecade=100, intervals=False):
"""
Integrate 1d function using the log-log trapezoidal rule. If scalar values
for xmin and xmax are passed an oversampled grid is generated using the
``ndecade`` keyword argument. If xmin and xmax arrays are passed, no
oversampling is performed and the integral is computed in the provided
grid.
Parameters
----------
func : callable
Function to integrate.
xmin : `~astropy.units.Quantity` or array-like
Integration range minimum
xmax : `~astropy.units.Quantity` or array-like
Integration range minimum
ndecade : int, optional
Number of grid points per decade used for the integration.
Default : 100.
intervals : bool, optional
Return integrals in the grid not the sum, default: False
"""
is_quantity = False
if isinstance(xmin, Quantity):
unit = xmin.unit
xmin = xmin.value
xmax = xmax.to(unit).value
is_quantity = True
if np.isscalar(xmin):
logmin = np.log10(xmin)
logmax = np.log10(xmax)
n = (logmax - logmin) * ndecade
x = np.logspace(logmin, logmax, n)
else:
x = np.append(xmin, xmax[-1])
if is_quantity:
x = x * unit
y = func(x)
val = _trapz_loglog(y, x, intervals=intervals)
return val
# This function is copied over from https://github.com/zblz/naima/blob/master/naima/utils.py#L261
# and slightly modified to allow use with the uncertainties package
def _trapz_loglog(y, x, axis=-1, intervals=False):
"""
Integrate along the given axis using the composite trapezoidal rule in
loglog space.
Integrate `y` (`x`) along given axis in loglog space.
Parameters
----------
y : array_like
Input array to integrate.
x : array_like, optional
Independent variable to integrate over.
axis : int, optional
Specify the axis.
intervals : bool, optional
Return array of shape x not the total integral, default: False
Returns
-------
trapz : float
Definite integral as approximated by trapezoidal rule in loglog space.
"""
log10 = np.log10
try:
y_unit = y.unit
y = y.value
except AttributeError:
y_unit = 1.
try:
x_unit = x.unit
x = x.value
except AttributeError:
x_unit = 1.
y = np.asanyarray(y)
x = np.asanyarray(x)
slice1 = [slice(None)] * y.ndim
slice2 = [slice(None)] * y.ndim
slice1[axis] = slice(None, -1)
slice2[axis] = slice(1, None)
# arrays with uncertainties contain objects
if y.dtype == 'O':
from uncertainties.unumpy import log10
# uncertainties.unumpy.log10 can't deal with tiny values see
# https://github.com/gammapy/gammapy/issues/687, so we filter out the values
# here. As the values are so small it doesn't affect the final result.
# the sqrt is taken to create a margin, because of the later division
# y[slice2] / y[slice1]
valid = y > np.sqrt(np.finfo(float).tiny)
x, y = x[valid], y[valid]
if x.ndim == 1:
shape = [1] * y.ndim
shape[axis] = x.shape[0]
x = x.reshape(shape)
with np.errstate(invalid='ignore', divide='ignore'):
# Compute the power law indices in each integration bin
b = log10(y[slice2] / y[slice1]) / log10(x[slice2] / x[slice1])
# if local powerlaw index is -1, use \int 1/x = log(x); otherwise use normal
# powerlaw integration
trapzs = np.where(
np.abs(b + 1.) > 1e-10, (y[slice1] * (
x[slice2] * (x[slice2] / x[slice1]) ** b - x[slice1])) / (b + 1),
x[slice1] * y[slice1] * np.log(x[slice2] / x[slice1]))
tozero = (y[slice1] == 0.) + (y[slice2] == 0.) + (x[slice1] == x[slice2])
trapzs[tozero] = 0.
if intervals:
return trapzs * x_unit * y_unit
ret = np.add.reduce(trapzs, axis) * x_unit * y_unit
return ret