# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
import copy
import numpy as np
import astropy.units as u
from ..utils.fitting import Fit, Parameters, Dataset
from .. import stats
from ..utils.random import get_random_state
from .core import CountsSpectrum
from .utils import CountsPredictor
from .observation import SpectrumObservationList, SpectrumObservation
__all__ = ["SpectrumFit", "SpectrumDataset"]
log = logging.getLogger(__name__)
[docs]class SpectrumDataset(Dataset):
"""Compute spectral model fit statistic on a CountsSpectrum.
Parameters
----------
model : `~gammapy.spectrum.models.SpectralModel`
Fit model
counts : `~gammapy.spectrum.CountsSpectrum`
Counts spectrum
livetime : float
Livetime
mask : `~numpy.ndarray`
Mask to apply to the likelihood.
aeff : `~gammapy.irf.EffectiveAreaTable`
Effective area
edisp : `~gammapy.irf.EnergyDispersion`
Energy dispersion
background : `~gammapy.spectrum.CountsSpectrum`
Background to use for the fit.
"""
def __init__(
self,
model,
counts=None,
livetime=None,
mask=None,
aeff=None,
edisp=None,
background=None,
):
if mask is not None and mask.dtype != np.dtype("bool"):
raise ValueError("mask data must have dtype bool")
self.model = model
self.counts = counts
self.livetime = livetime
self.mask = mask
self.aeff = aeff
self.edisp = edisp
self.background = background
self.parameters = Parameters(self.model.parameters.parameters)
if edisp is None:
self.predictor = CountsPredictor(
model=self.model,
livetime=self.livetime,
aeff=self.aeff,
e_true=self.counts.energy.bins,
)
else:
self.predictor = CountsPredictor(
model=self.model,
aeff=self.aeff,
edisp=self.edisp,
livetime=self.livetime,
)
@property
def data_shape(self):
"""Shape of the counts data"""
return self.counts.data.shape
[docs] def npred(self):
"""Returns npred map (model + background)"""
self.predictor.run()
model_npred = self.predictor.npred.data.data
back_npred = self.background.data.data
total_npred = model_npred + back_npred
return total_npred
[docs] def likelihood_per_bin(self):
"""Likelihood per bin given the current model parameters"""
return stats.cash(n_on=self.counts.data.data, mu_on=self.npred())
[docs] def likelihood(self, parameters, mask=None):
"""Total likelihood given the current model parameters.
Parameters
----------
mask : `~numpy.ndarray`
Mask to be combined with the dataset mask.
"""
if self.mask is None and mask is None:
stat = self.likelihood_per_bin()
elif self.mask is None:
stat = self.likelihood_per_bin()[mask]
elif mask is None:
stat = self.likelihood_per_bin()[self.mask]
else:
stat = self.likelihood_per_bin()[mask & self.mask]
return np.sum(stat, dtype=np.float64)
[docs] def fake(self, random_state="random-seed"):
"""Simulate a fake `~gammapy.spectrum.CountsSpectrum`.
Parameters
----------
random_state : {int, 'random-seed', 'global-rng', `~numpy.random.RandomState`}
Defines random number generator initialisation.
Passed to `~gammapy.utils.random.get_random_state`.
Returns
-------
spectrum : `~gammapy.spectrum.CountsSpectrum`
the fake count spectrum
"""
random_state = get_random_state(random_state)
data = random_state.poisson(self.npred())
ebounds = self.counts.energy.bins
return CountsSpectrum(ebounds[:-1], ebounds[1:], data)
[docs]class SpectrumFit(Fit):
"""Orchestrate a 1D counts spectrum fit.
After running the :func:`~gammapy.spectrum.SpectrumFit.run` method, the fit
results are available in :func:`~gammapy.spectrum.SpectrumFit.result`. For usage
examples see :ref:`spectral_fitting`
Parameters
----------
obs_list : `~gammapy.spectrum.SpectrumObservationList`, `~gammapy.spectrum.SpectrumObservation`
Observation(s) to fit
model : `~gammapy.spectrum.models.SpectralModel`
Source model with initial parameter values. Should return counts if
``forward_folded`` is False and a flux otherwise
stat : {'wstat', 'cash'}
Fit statistic
forward_folded : bool, default: True
Fold ``model`` with the IRFs given in ``obs_list``
fit_range : tuple of `~astropy.units.Quantity`
The intersection between the fit range and the observation thresholds will be used.
If you want to control which bins are taken into account in the fit for each
observation, use :func:`~gammapy.spectrum.PHACountsSpectrum.quality`
"""
def __init__(
self, obs_list, model, stat="wstat", forward_folded=True, fit_range=None
):
self.obs_list = obs_list
self._model = model
self.stat = stat
self.forward_folded = forward_folded
self.fit_range = fit_range
self._predicted_counts = None
self._statval = None
self._result = None
self._check_valid_fit()
self._apply_fit_range()
def __str__(self):
ss = self.__class__.__name__
ss += "\nSource model {}".format(self._model.__class__.__name__)
ss += "\nStat {}".format(self.stat)
ss += "\nForward Folded {}".format(self.forward_folded)
ss += "\nFit range {}".format(self.fit_range)
return ss
@property
def obs_list(self):
"""Observations participating in the fit"""
return self._obs_list
@obs_list.setter
def obs_list(self, obs_list):
if isinstance(obs_list, SpectrumObservation):
obs_list = SpectrumObservationList([obs_list])
self._obs_list = SpectrumObservationList(obs_list)
@property
def bins_in_fit_range(self):
"""Bins participating in the fit for each observation."""
return self._bins_in_fit_range
@property
def predicted_counts(self):
"""Current value of predicted counts.
For each observation a tuple to counts for the on and off region is
returned.
"""
return self._predicted_counts
@property
def statval(self):
"""Current value of statval.
For each observation the statval per bin is returned.
"""
return self._statval
@property
def fit_range(self):
"""Fit range."""
return self._fit_range
@fit_range.setter
def fit_range(self, fit_range):
self._fit_range = fit_range
self._apply_fit_range()
@property
def true_fit_range(self):
"""True fit range for each observation.
True fit range is the fit range set in the
`~gammapy.spectrum.SpectrumFit` with observation threshold taken into
account.
"""
true_range = []
for binrange, obs in zip(self.bins_in_fit_range, self.obs_list):
idx = np.where(binrange)[0]
if len(idx) == 0:
true_range.append(None)
continue
e_min = obs.e_reco[idx[0]]
e_max = obs.e_reco[idx[-1] + 1]
fit_range = u.Quantity((e_min, e_max))
true_range.append(fit_range)
return true_range
def _apply_fit_range(self):
"""Mark bins within desired fit range for each observation."""
self._bins_in_fit_range = []
for obs in self.obs_list:
# Take into account fit range
energy = obs.e_reco
valid_range = np.zeros(energy.nbins)
if self.fit_range is not None:
precision = 1e-3 # to avoid floating round precision
idx_lo = np.where(energy * (1 + precision) < self.fit_range[0])[0]
valid_range[idx_lo] = 1
idx_hi = np.where(energy[:-1] * (1 - precision) > self.fit_range[1])[0]
if len(idx_hi) != 0:
idx_hi = np.insert(idx_hi, 0, idx_hi[0] - 1)
valid_range[idx_hi] = 1
# Take into account thresholds
try:
quality = obs.on_vector.quality
except AttributeError:
quality = np.zeros(obs.e_reco.nbins)
intersection = np.logical_and(1 - quality, 1 - valid_range)
self._bins_in_fit_range.append(intersection)
[docs] def predict_counts(self):
"""Predict counts for all observations.
The result is stored as ``predicted_counts`` attribute.
"""
predicted_counts = []
for obs in self.obs_list:
mu_sig = self._predict_counts_helper(obs, self._model, self.forward_folded)
predicted_counts.append(mu_sig)
self._predicted_counts = predicted_counts
def _predict_counts_helper(self, obs, model, forward_folded=True):
"""Predict counts for one observation.
Parameters
----------
obs : `~gammapy.spectrum.SpectrumObservation`
Response functions
model : `~gammapy.spectrum.models.SpectralModel`
Source or background model
forward_folded : bool, default: True
Fold model with IRFs
Returns
-------
predicted_counts : `numpy.ndarray`
Predicted counts for one observation
"""
predictor = CountsPredictor(model=model)
if forward_folded:
predictor.aeff = obs.aeff
predictor.edisp = obs.edisp
else:
predictor.e_true = obs.e_reco
predictor.livetime = obs.livetime
predictor.run()
counts = predictor.npred.data.data
# Check count unit (~unit of model amplitude)
if counts.unit.is_equivalent(""):
counts = counts.value
else:
raise ValueError("Predicted counts {}".format(counts))
# Apply AREASCAL column
counts *= obs.on_vector.areascal
return counts
[docs] def calc_statval(self):
"""Calc statistic for all observations.
The result is stored as attribute ``statval``, bin outside the fit
range are set to 0.
"""
statval = []
for obs, npred in zip(self.obs_list, self.predicted_counts):
on_stat = self._calc_statval_helper(obs, npred)
statval.append(on_stat)
self._statval = statval
self._restrict_statval()
def _calc_statval_helper(self, obs, prediction):
"""Calculate ``statval`` for one observation.
Parameters
----------
obs : `~gammapy.spectrum.SpectrumObservation`
Measured counts
prediction : tuple of `~numpy.ndarray`
Predicted counts
Returns
-------
statsval : tuple of `~numpy.ndarray`
Statval
"""
if self.stat == "cash":
return stats.cash(n_on=obs.on_vector.data.data.value, mu_on=prediction)
elif self.stat == "cstat":
return stats.cstat(n_on=obs.on_vector.data.data.value, mu_on=prediction)
elif self.stat == "wstat":
on_stat_ = stats.wstat(
n_on=obs.on_vector.data.data.value,
n_off=obs.off_vector.data.data.value,
alpha=obs.alpha,
mu_sig=prediction,
)
return np.nan_to_num(on_stat_)
else:
raise NotImplementedError("{}".format(self.stat))
[docs] def total_stat(self, parameters):
"""Statistic summed over all bins and all observations.
This is the likelihood function that is passed to the optimizers
Parameters
----------
parameters : `~gammapy.utils.fitting.Parameters`
Model parameters
"""
self.predict_counts()
self.calc_statval()
total_stat = np.sum([np.sum(v) for v in self.statval], dtype=np.float64)
return total_stat
def _restrict_statval(self):
"""Apply valid fit range to statval.
"""
for statval, valid_range in zip(self.statval, self.bins_in_fit_range):
# Find bins outside safe range
idx = np.where(np.invert(valid_range))[0]
statval[idx] = 0
def _check_valid_fit(self):
"""Helper function to give useful error messages."""
# Assume that settings are the same for all observations
test_obs = self.obs_list[0]
irfs_exist = test_obs.aeff is not None or test_obs.edisp is not None
if self.forward_folded and not irfs_exist:
raise ValueError("IRFs required for forward folded fit")
if self.stat == "wstat" and self.obs_list[0].off_vector is None:
raise ValueError("Off vector required for WStat fit")
try:
test_obs.livetime
except KeyError:
raise ValueError("No observation livetime given")
@property
def result(self):
"""Bundle fit results into `~gammapy.spectrum.SpectrumFitResult`.
Parameters
----------
parameters : `~gammapy.utils.modeling.Parameters`
Best fit parameters
"""
from . import SpectrumFitResult
# run again with best fit parameters
statname = self.stat
results = []
for idx, obs in enumerate(self.obs_list):
fit_range = self.true_fit_range[idx]
statval = np.sum(self.statval[idx])
stat_per_bin = self.statval[idx]
npred = copy.deepcopy(self.predicted_counts[idx])
results.append(
SpectrumFitResult(
model=self._model,
fit_range=fit_range,
statname=statname,
statval=statval,
stat_per_bin=stat_per_bin,
npred=npred,
obs=obs,
)
)
return results