# Licensed under a 3-clause BSD style license - see LICENSE.rst
import logging
from itertools import repeat
import numpy as np
import scipy.stats as stats
from astropy import units as u
from astropy.table import Table
import gammapy.utils.parallel as parallel
from gammapy.datasets import Datasets
from gammapy.datasets.utils import set_and_restore_mask_fit
from gammapy.datasets.actors import DatasetsActor
from gammapy.datasets.flux_points import _get_reference_model
from gammapy.maps import MapAxis, Map
from gammapy.modeling import Fit, Sampler, Parameters, Parameter
from gammapy.modeling.models import (
Models,
PowerLawNormSpectralModel,
TemplateNPredModel,
UniformPrior,
)
from ..flux import FluxEstimator
from .core import FluxPoints
log = logging.getLogger(__name__)
__all__ = ["FluxPointsEstimator", "FluxCollectionEstimator"]
[docs]
class FluxPointsEstimator(FluxEstimator, parallel.ParallelMixin):
"""Flux points estimator.
Estimates flux points for a given list of datasets, energies and spectral model.
To estimate the flux point the amplitude of the reference spectral model is
fitted within the energy range defined by the energy group. This is done for
each group independently. The amplitude is re-normalized using the "norm" parameter,
which specifies the deviation of the flux from the reference model in this
energy group. See https://gamma-astro-data-formats.readthedocs.io/en/latest/spectra/binned_likelihoods/index.html
for details.
The method is also described in the `Fermi-LAT catalog paper <https://ui.adsabs.harvard.edu/abs/2015ApJS..218...23A>`__
or the `H.E.S.S. Galactic Plane Survey paper <https://ui.adsabs.harvard.edu/abs/2018A%26A...612A...1H>`__
Parameters
----------
source : str or int
For which source in the model to compute the flux points.
n_sigma : float, optional
Number of sigma to use for asymmetric error computation. Must be a positive value.
Default is 1.
n_sigma_ul : float, optional
Number of sigma to use for upper limit computation. Must be a positive value.
Default is 2.
n_sigma_sensitivity : float, optional
Sigma to use for sensitivity computation. Must be a positive value.
Default is 5.
selection_optional : list of str, optional
Which additional quantities to estimate. Available options are:
* "all": all the optional steps are executed.
* "errn-errp": estimate asymmetric errors on flux.
* "ul": estimate upper limits.
* "scan": estimate fit statistic profiles.
* "sensitivity": estimate sensitivity for a given significance.
Default is None so the optional steps are not executed.
energy_edges : list of `~astropy.units.Quantity`, optional
Edges of the flux points energy bins. The resulting bin edges won't be exactly equal to the input ones,
but rather the closest values to the energy axis edges of the parent dataset.
Default is [1, 10] TeV.
fit : `~gammapy.modeling.Fit`, optional
Fit instance specifying the backend and fit options. If None, the `~gammapy.modeling.Fit` instance is created
internally. Default is None.
reoptimize : bool, optional
If True, the free parameters of the other models are fitted in each bin independently,
together with the norm of the source of interest
(but the other parameters of the source of interest are kept frozen).
If False, only the norm of the source of interest is fitted,
and all other parameters are frozen at their current values.
Default is False.
sum_over_energy_groups : bool, optional
Whether to sum over the energy groups or fit the norm on the full energy grid. Default is None.
n_jobs : int, optional
Number of processes used in parallel for the computation. The number of jobs is limited to the number of
physical CPUs. If None, defaults to `~gammapy.utils.parallel.N_JOBS_DEFAULT`.
Default is None.
parallel_backend : {"multiprocessing", "ray"}, optional
Which backend to use for multiprocessing. If None, defaults to `~gammapy.utils.parallel.BACKEND_DEFAULT`.
norm : `~gammapy.modeling.Parameter` or dict, optional
Norm parameter used for the fit.
Default is None and a new parameter is created automatically, with value=1, name="norm",
scan_min=0.2, scan_max=5, and scan_n_values = 11. By default, the min and max are not set
(consider setting them if errors or upper limits computation fails). If a dict is given,
the entries should be a subset of `~gammapy.modeling.Parameter` arguments.
allow_multiple_telescopes : bool, optional
Whether to allow the computation for different telescopes.
**WARNING**: This is currently an experimental feature.
Default is False.
Notes
-----
- For further explanation, see :ref:`estimators`.
- In case of failure of upper limits computation (e.g. nan), see the User Guide: :ref:`dropdown-UL`.
Examples
--------
.. testcode::
from astropy import units as u
from gammapy.datasets import SpectrumDatasetOnOff
from gammapy.estimators import FluxPointsEstimator
from gammapy.modeling.models import PowerLawSpectralModel, SkyModel
path = "$GAMMAPY_DATA/joint-crab/spectra/hess/"
dataset = SpectrumDatasetOnOff.read(path + "pha_obs23523.fits")
pwl = PowerLawSpectralModel(index=2.7, amplitude='3e-11 cm-2 s-1 TeV-1')
dataset.models = SkyModel(spectral_model=pwl, name="crab")
estimator = FluxPointsEstimator(
source="crab",
energy_edges=[0.1, 0.3, 1, 3, 10, 30, 100] * u.TeV,
)
fp = estimator.run(dataset)
print(fp)
.. testoutput::
FluxPoints
----------
geom : RegionGeom
axes : ['lon', 'lat', 'energy']
shape : (1, 1, 6)
quantities : ['norm', 'norm_err', 'ts', 'npred', 'npred_excess', 'stat', 'stat_null', 'counts', 'success']
ref. model : pl
n_sigma : 1
n_sigma_ul : 2
sqrt_ts_threshold_ul : 2
sed type init : likelihood
"""
tag = "FluxPointsEstimator"
[docs]
def __init__(
self,
energy_edges=[1, 10] * u.TeV,
sum_over_energy_groups=False,
n_jobs=None,
parallel_backend=None,
allow_multiple_telescopes=False,
**kwargs,
):
self.energy_edges = energy_edges
self.sum_over_energy_groups = sum_over_energy_groups
self.n_jobs = n_jobs
self.parallel_backend = parallel_backend
self.allow_multiple_telescopes = allow_multiple_telescopes
fit = Fit(confidence_opts={"backend": "scipy"})
kwargs.setdefault("fit", fit)
super().__init__(**kwargs)
[docs]
def run(self, datasets):
"""Run the flux point estimator for all energy groups.
Parameters
----------
datasets : `~gammapy.datasets.Datasets`
Datasets.
Returns
-------
flux_points : `FluxPoints`
Estimated flux points.
"""
if not isinstance(datasets, DatasetsActor):
datasets = Datasets(datasets=datasets)
if not datasets.energy_axes_are_aligned:
raise ValueError("All datasets must have aligned energy axes.")
telescopes = []
for d in datasets:
if d.meta_table is not None and "TELESCOP" in d.meta_table.colnames:
telescopes.extend(list(d.meta_table["TELESCOP"].flatten()))
if len(np.unique(telescopes)) > 1 and not self.allow_multiple_telescopes:
raise ValueError(
"All datasets must use the same value of the 'TELESCOP' meta keyword."
)
meta = {
"n_sigma": self.n_sigma,
"n_sigma_ul": self.n_sigma_ul,
"sed_type_init": "likelihood",
}
rows = parallel.run_multiprocessing(
self.estimate_flux_point,
zip(
repeat(datasets),
self.energy_edges[:-1],
self.energy_edges[1:],
),
backend=self.parallel_backend,
pool_kwargs=dict(processes=self.n_jobs),
task_name="Energy bins",
)
table = Table(rows, meta=meta)
model = _get_reference_model(datasets.models[self.source], self.energy_edges)
return FluxPoints.from_table(
table=table,
reference_model=model.copy(),
gti=datasets.gti,
format="gadf-sed",
)
[docs]
def estimate_flux_point(self, datasets, energy_min, energy_max):
"""Estimate flux point for a single energy group.
Parameters
----------
datasets : `~gammapy.datasets.Datasets`
Datasets.
energy_min, energy_max : `~astropy.units.Quantity`
Energy bounds to compute the flux point for.
Returns
-------
result : dict
Dictionary with results for the flux point.
"""
datasets_sliced = datasets.slice_by_energy(
energy_min=energy_min, energy_max=energy_max
)
if self.sum_over_energy_groups:
datasets_sliced = datasets_sliced.__class__(
[_.to_image(name=_.name) for _ in datasets_sliced]
)
if len(datasets_sliced) > 0:
if datasets.models is not None:
models_sliced = datasets.models._slice_by_energy(
energy_min=energy_min,
energy_max=energy_max,
sum_over_energy_groups=self.sum_over_energy_groups,
)
datasets_sliced.models = models_sliced
return super().run(datasets=datasets_sliced)
else:
log.warning(f"No dataset contribute in range {energy_min}-{energy_max}")
model = _get_reference_model(
datasets.models[self.source], self.energy_edges
)
return self._nan_result(datasets, model, energy_min, energy_max)
def _nan_result(self, datasets, model, energy_min, energy_max):
"""NaN result."""
energy_axis = MapAxis.from_energy_edges([energy_min, energy_max])
with np.errstate(invalid="ignore", divide="ignore"):
result = model.reference_fluxes(energy_axis=energy_axis)
# convert to scalar values
result = {key: value.item() for key, value in result.items()}
result.update(
{
"norm": np.nan,
"stat": np.nan,
"success": False,
"norm_err": np.nan,
"ts": np.nan,
"counts": np.zeros(len(datasets)),
"npred": np.nan * np.zeros(len(datasets)),
"npred_excess": np.nan * np.zeros(len(datasets)),
"datasets": datasets.names,
}
)
if "errn-errp" in self.selection_optional:
result.update({"norm_errp": np.nan, "norm_errn": np.nan})
if "ul" in self.selection_optional:
result.update({"norm_ul": np.nan})
if "scan" in self.selection_optional:
norm_scan = self.norm.copy().scan_values
result.update({"norm_scan": norm_scan, "stat_scan": np.nan * norm_scan})
if "sensitivity" in self.selection_optional:
result.update({"norm_sensitivity": np.nan})
return result
class FluxCollectionEstimator:
"""Estimate the flux points from a collection of sources simultaneously.
This estimator computes spectral flux points for a *collection of sources*
over a set of predefined energy bins. The normalizations of all
`~gammapy.modeling.models.SkyModel` objects listed in ``models`` are fitted
jointly in each energy bin, while all other `~gammapy.modeling.models.SkyModel`
components in the datasets remain frozen. Re-optimization of each dataset’s
background model is optional.
The operation can be performed either with the standard likelihood optimizer
(`~gammapy.modeling.Fit`) or with a sampler
(`~gammapy.modeling.Sampler`), which can be used to derive asymmetric
errors and upper limits.
By default, this requires the ultranest package to be installed.
Parameters
----------
energy_edges : `~astropy.units.Quantity`
Energy edges of the flux point bins.
models : `~gammapy.modeling.Models` or list
Source models for which the flux points are computed (others are frozen).
n_sigma : float, optional
Number of sigma to use for asymmetric error computation. Must be a positive value.
Default is 1.
n_sigma_ul : float, optional
Number of sigma to use for upper limit computation. Must be a positive value.
Default is 2.
norm : `~gammapy.modeling.Parameter`, optional
Norm parameter used for the fit.
Default is None and a new parameter is created with value=1, name="norm".
If the `solver` is a sampler the default prior is uniform between [-10, 10].
solver : `~gammapy.modeling.Fit` or `~gammapy.modeling.Sampler`
Fit or Sampler instance specifying the backend and options.
Default is a Sampler with options live_points=300, frac_remain=0.3.
reoptimize : bool
Whether to reoptimize each dataset.background_model. Default is False.
Only SkyModel given in `models` will be fitted the others remain frozen,
regardless of this option.
selection_optional : list of str, optional
Which additional quantities to estimate. Available options are:
* "errn-errp": estimate asymmetric errors on flux.
Fit solver computes upper limits if sqrt(TS) < n_sigma_ul.
Sampler solver always compute errn-errp and ul.
"""
def __init__(
self,
energy_edges,
models,
n_sigma=1,
n_sigma_ul=2,
norm=None,
solver=None,
reoptimize=False,
selection_optional=None,
):
self.n_sigma = n_sigma
self.n_sigma_ul = n_sigma_ul
self.models = models
self.ns = len(models)
self.solver = solver if solver is not None else self._default_solver
self.reoptimize = reoptimize
if selection_optional is None:
selection_optional = []
self.selection_optional = selection_optional
self.norm = norm if norm is not None else self._default_norm
self.energy_edges = energy_edges
self.energy_unit = "TeV"
self.dnde_unit = u.Unit("cm-2 s-1 TeV-1")
@property
def _default_norm(self):
prior = None
if isinstance(self.solver, Sampler):
prior = UniformPrior(min=-10, max=10)
return Parameter(name="norm", value=1, unit="", prior=prior)
@property
def _default_solver(self):
"""Return an ultranest Sampler with options live_points=300, frac_remain=0.3."""
return Sampler(
backend="ultranest",
sampler_opts={"live_points": 300, "frac_remain": 0.3},
)
@property
def _available_keys(self):
# TODO: what about 'npred' key
keys = ["norm", "norm_ul", "ts"]
if isinstance(self.solver, Fit):
keys.append("norm_err")
if isinstance(self.solver, Sampler) or "errn-errp" in self.selection_optional:
keys.extend(["norm_errn", "norm_errp"])
return keys
def _empty_result_dict(self):
"""Build an empty dictionary to store results."""
n_sources = len(self.models)
empty_fp_result = {key: np.zeros(n_sources) for key in self._available_keys}
empty_fp_result["npred"] = np.zeros(n_sources)
return empty_fp_result
@property
def _energy_axis(self):
"""Energy axis for the input edges."""
return MapAxis.from_energy_edges(self.energy_edges, name="energy", interp="log")
def _prepare_dataset(self, dataset, spectral_norm_models):
"""Build NPredTemplateModel for a given dataset.
Create one single template summing all frozen sources and create one template per free source with free norm.
"""
# TODO: can we avoid the deep copy? Does is remove the cached evaluators?
fp_dataset = dataset.copy(name=dataset.name)
source_names = set(Models(self.models).names)
frozen_names = set(dataset._evaluators.keys()) - source_names
fp_models = []
npred_free = dataset.npred_signal(source_names, stack=False)
for idx, npred in enumerate(npred_free.split_by_axis("models")):
name = npred_free.geom.axes["models"].center[idx]
template_model = TemplateNPredModel(
npred,
name=name + "_" + fp_dataset.name,
spectral_model=spectral_norm_models[name],
datasets_names=[fp_dataset.name],
)
fp_models.append(template_model)
npred_frozen = fp_dataset.npred_signal(frozen_names, stack=True)
bkg_frozen = TemplateNPredModel(
npred_frozen,
name="frozen_" + fp_dataset.name,
datasets_names=[fp_dataset.name],
)
bkg_frozen.spectral_model.norm.frozen = True
bkg_model = self._get_bkg(fp_dataset)
fp_dataset.models = Models(fp_models + [bkg_frozen] + bkg_model)
return fp_dataset
def _prepare_datasets(self, datasets):
"""Define datasets with cached npred models to be renormalized."""
norm_models = {}
for m in self.models:
norm_models[m.name] = PowerLawNormSpectralModel(norm=self.norm.copy())
norm_models[m.name].tilt.frozen = True
fp_datasets = [self._prepare_dataset(d, norm_models) for d in datasets]
return Datasets(fp_datasets), norm_models
def _get_bkg(self, d):
if d.background_model:
bkg_model = [d.background_model.copy(name=d.background_model.name)]
if not self.reoptimize:
bkg_model[0].freeze()
else:
bkg_model = []
return bkg_model
@staticmethod
def _compute_npred(datasets, param, model):
"""Compute npred within the datasets masks."""
npred = 0
for kd, d in enumerate(datasets):
name = model.name + "_" + d.name
if d.evaluators[name].contributes:
npred_map = Map.from_geom(d.counts.geom)
npred_map.stack(d.evaluators[name].compute_npred())
npred += np.nansum(npred_map.data * d.mask.data) * param.value
return npred
@staticmethod
def _compute_ts(datasets, param):
"""Test statistic against no source as null hypothesis."""
cash = datasets._stat_sum_likelihood()
with Parameters([param]).restore_status():
param.value = 0
cash0 = datasets._stat_sum_likelihood()
return cash0 - cash
def _run_fit(self, fp_datasets, spectral_models):
"""Compute flux estimates, uncertainties and UL using log-likelihood profile (i.e. using `~gammapy.modeling.Fit`)."""
fit_results = self.solver.run(fp_datasets)
fp_result = self._empty_result_dict()
fp_result["solver_results"] = fit_results
for km, (m, spec) in enumerate(zip(self.models, spectral_models.values())):
norm_param = spec.norm
norm = norm_param.value
error = norm_param.error
npred = self._compute_npred(fp_datasets, norm_param, m)
fp_result["npred"][km] = npred
fp_result["norm"][km] = norm
fp_result["norm_err"][km] = error
if "errn-errp" in self.selection_optional:
res = self.solver.confidence(
datasets=fp_datasets,
parameter=norm_param,
sigma=self.n_sigma,
)
fp_result["norm_errn"][km] = res["errn"]
fp_result["norm_errp"][km] = res["errp"]
ts_null = self._compute_ts(fp_datasets, norm_param)
fp_result["ts"][km] = ts_null
if np.sign(ts_null) * np.sqrt(np.abs(ts_null)) < self.n_sigma_ul:
res = self.solver.confidence(
datasets=fp_datasets,
parameter=norm_param,
sigma=self.n_sigma_ul,
)
fp_result["norm_ul"][km] = norm + res["errp"]
else:
fp_result["norm_ul"][km] = np.nan
return fp_result
def _run_sampler(self, fp_datasets, spectral_models):
"""Compute npred, dnde, TS, errn, errp, and ul."""
sampler_results = self.solver.run(fp_datasets).sampler_results
fp_result = self._empty_result_dict()
fp_result["solver_results"] = sampler_results
points = sampler_results["weighted_samples"]["points"]
weights = sampler_results["weighted_samples"]["weights"]
quantiles = {
"norm": 50,
"norm_errn": 100 * stats.norm.cdf(-self.n_sigma),
"norm_errp": 100 * stats.norm.cdf(self.n_sigma),
"norm_ul": 100 * stats.norm.cdf(self.n_sigma_ul),
}
for km, (m, spec) in enumerate(zip(self.models, spectral_models.values())):
samples = points[:, km]
norm_param = spec.norm
percentiles = {
k: np.percentile(samples, q, weights=weights, method="inverted_cdf")
for k, q in quantiles.items()
}
norm = percentiles["norm"]
norm_param.value = norm # set before TS computation below
fp_result["norm"][km] = norm
fp_result["norm_errn"][km] = norm - percentiles["norm_errn"]
fp_result["norm_errp"][km] = percentiles["norm_errp"] - norm
fp_result["norm_ul"][km] = percentiles["norm_ul"]
fp_result["npred"][km] = self._compute_npred(fp_datasets, norm_param, m)
# compute TS after norm value is set to median for all models
for km, spec in enumerate(spectral_models.values()):
norm_param = spec.norm
ts_null = self._compute_ts(fp_datasets, norm_param)
fp_result["ts"][km] = ts_null
return fp_result
def run(self, datasets):
"""Compute flux point in each energy band.
Parameters
----------
datasets : `~gammapy.datasets.Datasets`
Datasets used to compute the flux points. They must share the same geometry.
Returns
-------
result : dict
Dict with results
"""
datasets = Datasets(datasets)
if len(datasets) == 0:
raise ValueError("datasets cannot be empty")
if not datasets.is_all_same_geom:
raise ValueError("Inconsistent geometries between datasets")
for d in datasets:
d.npred() # precompute npred
fp_datasets, spectral_models = self._prepare_datasets(datasets)
fp_results = []
for ke, (emin, emax) in enumerate(self._energy_axis.iter_by_edges):
with set_and_restore_mask_fit(
fp_datasets, energy_min=emin, energy_max=emax
):
if isinstance(self.solver, Sampler):
fp_result = self._run_sampler(fp_datasets, spectral_models)
else:
fp_result = self._run_fit(fp_datasets, spectral_models)
fp_results.append(fp_result)
return self._get_flux_points_dict(fp_results)
def _get_flux_points_dict(self, fp_results):
"""Extract flux points for each model from list of results.
Parameters
----------
fp_results : dict
dict used to generate the flux points table.
Returns
-------
result : dict
Dictionary with results.
"""
def build_fp_from_idx(idx, model):
table = Table()
table["e_min"] = self._energy_axis.edges_min.to(self.energy_unit)
table["e_max"] = self._energy_axis.edges_max.to(self.energy_unit)
table["e_ref"] = self._energy_axis.center.to(self.energy_unit)
table["ref_dnde"] = model(table["e_ref"]).to(self.dnde_unit)
for key in self._available_keys:
table[key] = np.array([fp[key][idx] for fp in fp_results])
table.meta["SED_TYPE"] = "likelihood"
return FluxPoints.from_table(
table, reference_model=model.copy(), format="gadf-sed"
)
fp_dict = dict(
energy_edges=self.energy_edges,
solver_results=np.array(
[fp["solver_results"] for fp in fp_results], dtype=object
),
flux_points={},
)
for idx, m in enumerate(self.models):
model = _get_reference_model(m, self.energy_edges)
fp_dict["flux_points"][m.name] = build_fp_from_idx(idx, model)
if isinstance(self.solver, Sampler):
weights = [
fp["solver_results"]["weighted_samples"]["weights"] for fp in fp_results
]
dnde_dict = {}
for model_idx, m in enumerate(self.models):
dnde_dict[m.name] = []
dnde_ref = fp_dict["flux_points"][m.name]["dnde_ref"].squeeze()
for dnde, fp in zip(dnde_ref, fp_results):
points = fp["solver_results"]["weighted_samples"]["points"]
dnde_dict[m.name].append(dnde * points[:, model_idx])
fp_dict["samples"] = dict(dnde=dnde_dict, weights=weights)
return fp_dict