# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
from collections import OrderedDict
import numpy as np
from ..utils.fitting import Fit, Datasets
from ..time import LightCurve
from ..spectrum import SpectrumDatasetOnOff, FluxPoints
from ..spectrum.models import ScaleModel
from ..utils.table import table_from_row_data
__all__ = ["LightCurveEstimator"]
log = logging.getLogger(__name__)
[docs]class LightCurveEstimator:
"""Estimate flux points for a given list of datasets, each per time bin.
Parameters
----------
datasets : list of `~gammapy.spectrum.SpectrumDataset` or `~gammapy.cube.MapDataset`
Spectrum or Map datasets.
source : str
For which source in the model to compute the flux points. Default is ''
norm_min : float
Minimum value for the norm used for the likelihood profile evaluation.
norm_max : float
Maximum value for the norm used for the likelihood profile evaluation.
norm_n_values : int
Number of norm values used for the likelihood profile.
norm_values : `numpy.ndarray`
Array of norm values to be used for the likelihood profile.
sigma : int
Sigma to use for asymmetric error computation.
sigma_ul : int
Sigma to use for upper limit computation.
reoptimize : bool
reoptimize other parameters during likelihod scan
"""
def __init__(
self,
datasets,
source="",
norm_min=0.2,
norm_max=5,
norm_n_values=11,
norm_values=None,
sigma=1,
sigma_ul=2,
reoptimize=False,
):
if not isinstance(datasets, Datasets):
datasets = Datasets(datasets)
self.datasets = datasets
if not datasets.is_all_same_type and datasets.is_all_same_shape:
raise ValueError(
"Light Curve estimation requires a list of datasets"
" of the same type and data shape."
)
dataset = self.datasets.datasets[0]
if isinstance(dataset, SpectrumDatasetOnOff):
model = dataset.model
else:
model = dataset.model[source].spectral_model
self.model = ScaleModel(model)
self.model.norm.min = 0
self.model.norm.max = 1e5
if norm_values is None:
norm_values = np.logspace(
np.log10(norm_min), np.log10(norm_max), norm_n_values
)
self.norm_values = norm_values
self.sigma = sigma
self.sigma_ul = sigma_ul
self.reoptimize = reoptimize
self.source = source
self._set_scale_model()
def _set_scale_model(self):
# set the model on all datasets
for dataset in self.datasets.datasets:
if isinstance(dataset, SpectrumDatasetOnOff):
dataset.model = self.model
else:
dataset.model[self.source].spectral_model = self.model
@property
def ref_model(self):
return self.model.model
[docs] def run(self, e_ref, e_min, e_max, steps="all"):
"""Run light curve extraction.
Normalize integral and energy flux between emin and emax.
Parameters
----------
e_ref : `~astropy.unit.Quantity`
reference energy of dnde flux normalization
e_min : `~astropy.unit.Quantity`
minimum energy of integral and energy flux normalization interval
e_max : `~astropy.unit.Quantity`
minimum energy of integral and energy flux normalization interval
steps : list of str
Which steps to execute. Available options are:
* "err": estimate symmetric error.
* "errn-errp": estimate asymmetric errors.
* "ul": estimate upper limits.
* "ts": estimate ts and sqrt(ts) values.
* "norm-scan": estimate likelihood profiles.
By default all steps are executed.
Returns
-------
lightcurve : `~gammapy.time.LightCurve`
the Light Curve object
"""
self.e_ref = e_ref
self.e_min = e_min
self.e_max = e_max
rows = []
for dataset in self.datasets.datasets:
row = {
"time_min": dataset.counts.meta["t_start"].mjd,
"time_max": dataset.counts.meta["t_stop"].mjd,
}
row.update(self.estimate_time_bin_flux(dataset, steps))
rows.append(row)
meta = OrderedDict([("SED_TYPE", "likelihood")])
table = table_from_row_data(rows=rows, meta=meta)
table = FluxPoints(table).to_sed_type("flux").table
return LightCurve(table)
[docs] def estimate_time_bin_flux(self, dataset, steps="all"):
"""Estimate flux point for a single energy group.
Parameters
----------
steps : list of str
Which steps to execute. Available options are:
* "err": estimate symmetric error.
* "errn-errp": estimate asymmetric errors.
* "ul": estimate upper limits.
* "ts": estimate ts and sqrt(ts) values.
* "norm-scan": estimate likelihood profiles.
By default all steps are executed.
Returns
-------
result : dict
Dict with results for the flux point.
"""
self.fit = Fit(dataset)
result = OrderedDict(
[
("e_ref", self.e_ref),
("e_min", self.e_min),
("e_max", self.e_max),
("ref_dnde", self.ref_model(self.e_ref)),
("ref_flux", self.ref_model.integral(self.e_min, self.e_max)),
("ref_eflux", self.ref_model.energy_flux(self.e_min, self.e_max)),
("ref_e2dnde", self.ref_model(self.e_ref) * self.e_ref ** 2),
]
)
result.update(self.estimate_norm())
if not result.pop("success"):
log.warning(
"Fit failed for time bin between {t_min} and {t_max},"
" setting NaN.".format(
t_min=dataset.counts.meta["t_start"],
t_max=dataset.counts.meta["t_stop"],
)
)
if steps == "all":
steps = ["err", "counts", "errp-errn", "ul", "ts", "norm-scan"]
if "err" in steps:
result.update(self.estimate_norm_err())
if "counts" in steps:
result.update(self.estimate_counts(dataset))
if "errp-errn" in steps:
result.update(self.estimate_norm_errn_errp())
if "ul" in steps:
result.update(self.estimate_norm_ul(dataset))
if "ts" in steps:
result.update(self.estimate_norm_ts())
if "norm-scan" in steps:
result.update(self.estimate_norm_scan())
return result
# TODO : most of the following code is copied from FluxPointsEstimator, can it be restructured?
[docs] def estimate_norm_errn_errp(self):
"""Estimate asymmetric errors for a flux point.
Returns
-------
result : dict
Dict with asymmetric errors for the flux point norm.
"""
result = self.fit.confidence(parameter=self.model.norm, sigma=self.sigma)
return {"norm_errp": result["errp"], "norm_errn": result["errn"]}
[docs] def estimate_norm_err(self):
"""Estimate covariance errors for a flux point.
Returns
-------
result : dict
Dict with symmetric error for the flux point norm.
"""
result = self.fit.covariance()
norm_err = result.parameters.error(self.model.norm)
return {"norm_err": norm_err}
[docs] def estimate_counts(self, dataset):
"""Estimate counts for the flux point.
Parameters
----------
dataset : `~gammapy.utils.fitting.Dataset`
the dataset object
Returns
-------
result : dict
Dict with an array with one entry per dataset with counts for the flux point.
"""
# TODO : use e_min and e_max interval for counts calculation
# TODO : add off counts and excess? for DatasetOnOff
# TODO : this may require a loop once we support Datasets per time bin
mask = dataset.mask
if dataset.mask_safe is not None:
mask &= dataset.mask_safe
counts = dataset.counts.data[mask].sum()
return {"counts": counts}
[docs] def estimate_norm_ul(self, dataset):
"""Estimate upper limit for a flux point.
Returns
-------
result : dict
Dict with upper limit for the flux point norm.
"""
norm = self.model.norm
# TODO: the minuit backend has convergence problems when the likelihood is not
# of parabolic shape, which is the case, when there are zero counts in the
# bin. For this case we change to the scipy backend.
counts = self.estimate_counts(dataset)["counts"]
if np.all(counts == 0):
result = self.fit.confidence(
parameter=norm,
sigma=self.sigma_ul,
backend="scipy",
reoptimize=self.reoptimize,
)
else:
result = self.fit.confidence(parameter=norm, sigma=self.sigma_ul)
return {"norm_ul": result["errp"] + norm.value}
[docs] def estimate_norm_ts(self):
"""Estimate ts and sqrt(ts) for the flux point.
Returns
-------
result : dict
Dict with ts and sqrt(ts) for the flux point.
"""
loglike = self.datasets.likelihood()
# store best fit amplitude, set amplitude of fit model to zero
self.model.norm.value = 0
self.model.norm.frozen = True
if self.reoptimize:
_ = self.fit.optimize()
loglike_null = self.datasets.likelihood()
# compute sqrt TS
ts = np.abs(loglike_null - loglike)
sqrt_ts = np.sqrt(ts)
return {"sqrt_ts": sqrt_ts, "ts": ts}
[docs] def estimate_norm_scan(self):
"""Estimate likelihood profile for the norm parameter.
Returns
-------
result : dict
Dict with norm_scan and dloglike_scan for the flux point.
"""
result = self.fit.likelihood_profile(
self.model.norm, values=self.norm_values, reoptimize=self.reoptimize
)
dloglike_scan = result["likelihood"]
return {"norm_scan": result["values"], "dloglike_scan": dloglike_scan}
[docs] def estimate_norm(self):
"""Fit norm of the flux point.
Returns
-------
result : dict
Dict with "norm" and "loglike" for the flux point.
"""
# start optimization with norm=1
self.model.norm.value = 1.0
self.model.norm.frozen = False
result = self.fit.optimize()
if result.success:
norm = self.model.norm.value
else:
norm = np.nan
return {"norm": norm, "loglike": result.total_stat, "success": result.success}